Commit c028ea24 authored by Jim Fulton's avatar Jim Fulton

tag

parents 16dd2c12 1a896e9c
...@@ -37,6 +37,8 @@ New Features ...@@ -37,6 +37,8 @@ New Features
- Multi-version concurrency control and iteration - Multi-version concurrency control and iteration
- Explicit support dfor demo-storage stacking via push and pop methods.
- Wen calling ZODB.DB to create a database, you can now pass a file - Wen calling ZODB.DB to create a database, you can now pass a file
name, rather than a storage to use a file storage. name, rather than a storage to use a file storage.
......
...@@ -103,22 +103,21 @@ class AbstractConnectionPool(object): ...@@ -103,22 +103,21 @@ class AbstractConnectionPool(object):
def getTimeout(self): def getTimeout(self):
return self._timeout return self._timeout
timeout = property(getTimeout, setTimeout) timeout = property(getTimeout, lambda self, v: self.setTimeout(v))
size = property(getSize, setSize) size = property(getSize, lambda self, v: self.setSize(v))
class ConnectionPool(AbstractConnectionPool): class ConnectionPool(AbstractConnectionPool):
def __init__(self, size, timeout=None): def __init__(self, size, timeout=time()):
super(ConnectionPool, self).__init__(size, timeout) super(ConnectionPool, self).__init__(size, timeout)
# A stack of connections available to hand out. This is a subset # A stack of connections available to hand out. This is a subset
# of self.all. push() and repush() add to this, and may remove # of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large. # the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than size entries # pop() pops this stack. There are never more than size entries
# in this stack. The keys are time.time() values of the push or # in this stack.
# repush calls. self.available = []
self.available = BTrees.OOBTree.Bucket()
def push(self, c): def push(self, c):
"""Register a new available connection. """Register a new available connection.
...@@ -127,10 +126,10 @@ class ConnectionPool(AbstractConnectionPool): ...@@ -127,10 +126,10 @@ class ConnectionPool(AbstractConnectionPool):
stack even if we're over the pool size limit. stack even if we're over the pool size limit.
""" """
assert c not in self.all assert c not in self.all
assert c not in self.available.values() assert c not in self.available
self._reduce_size(strictly_less=True) self._reduce_size(strictly_less=True)
self.all.add(c) self.all.add(c)
self.available[time()] = c self.available.append((time(), c))
n = len(self.all) n = len(self.all)
limit = self.size limit = self.size
if n > limit: if n > limit:
...@@ -147,43 +146,42 @@ class ConnectionPool(AbstractConnectionPool): ...@@ -147,43 +146,42 @@ class ConnectionPool(AbstractConnectionPool):
older available connections. older available connections.
""" """
assert c in self.all assert c in self.all
assert c not in self.available.values() assert c not in self.available
self._reduce_size(strictly_less=True) self._reduce_size(strictly_less=True)
self.available[time()] = c self.available.append((time(), c))
def _reduce_size(self, strictly_less=False): def _reduce_size(self, strictly_less=False):
"""Throw away the oldest available connections until we're under our """Throw away the oldest available connections until we're under our
target size (strictly_less=False, the default) or no more than that target size (strictly_less=False, the default) or no more than that
(strictly_less=True). (strictly_less=True).
""" """
if self.timeout is None: threshhold = time() - self.timeout
threshhold = None
else:
threshhold = time() - self.timeout
target = self.size target = self.size
if strictly_less: if strictly_less:
target -= 1 target -= 1
for t, c in list(self.available.items()):
if (len(self.available) > target or available = self.available
threshhold is not None and t < threshhold): while (
del self.available[t] (len(available) > target)
self.all.remove(c) or
# While application code may still hold a reference to `c`, (available and available[0][0] < threshhold)
# there's little useful that can be done with this Connection ):
# anymore. Its cache may be holding on to limited resources, t, c = available.pop(0)
# and we replace the cache with an empty one now so that we self.all.remove(c)
# don't have to wait for gc to reclaim it. Note that it's not # While application code may still hold a reference to `c`,
# possible for DB.open() to return `c` again: `c` can never be # there's little useful that can be done with this Connection
# in an open state again. # anymore. Its cache may be holding on to limited resources,
# TODO: Perhaps it would be better to break the reference # and we replace the cache with an empty one now so that we
# cycles between `c` and `c._cache`, so that refcounting # don't have to wait for gc to reclaim it. Note that it's not
# reclaims both right now. But if user code _does_ have a # possible for DB.open() to return `c` again: `c` can never be
# strong reference to `c` now, breaking the cycle would not # in an open state again.
# reclaim `c` now, and `c` would be left in a user-visible # TODO: Perhaps it would be better to break the reference
# crazy state. # cycles between `c` and `c._cache`, so that refcounting
c._resetCache() # reclaims both right now. But if user code _does_ have a
else: # strong reference to `c` now, breaking the cycle would not
break # reclaim `c` now, and `c` would be left in a user-visible
# crazy state.
c._resetCache()
def reduce_size(self): def reduce_size(self):
self._reduce_size() self._reduce_size()
...@@ -197,7 +195,7 @@ class ConnectionPool(AbstractConnectionPool): ...@@ -197,7 +195,7 @@ class ConnectionPool(AbstractConnectionPool):
""" """
result = None result = None
if self.available: if self.available:
result = self.available.pop(self.available.maxKey()) _, result = self.available.pop()
# Leave it in self.all, so we can still get at it for statistics # Leave it in self.all, so we can still get at it for statistics
# while it's alive. # while it's alive.
assert result in self.all assert result in self.all
...@@ -212,19 +210,15 @@ class ConnectionPool(AbstractConnectionPool): ...@@ -212,19 +210,15 @@ class ConnectionPool(AbstractConnectionPool):
If a connection is no longer viable because it has timed out, it is If a connection is no longer viable because it has timed out, it is
garbage collected.""" garbage collected."""
if self.timeout is None: threshhold = time() - self.timeout
threshhold = None for t, c in list(self.available):
else: if t < threshhold:
threshhold = time() - self.timeout
for t, c in tuple(self.available.items()):
if threshhold is not None and t < threshhold:
del self.available[t] del self.available[t]
self.all.remove(c) self.all.remove(c)
c._resetCache() c._resetCache()
else: else:
c.cacheGC() c.cacheGC()
class KeyedConnectionPool(AbstractConnectionPool): class KeyedConnectionPool(AbstractConnectionPool):
# this pool keeps track of keyed connections all together. It makes # this pool keeps track of keyed connections all together. It makes
# it possible to make assertions about total numbers of keyed connections. # it possible to make assertions about total numbers of keyed connections.
...@@ -233,99 +227,68 @@ class KeyedConnectionPool(AbstractConnectionPool): ...@@ -233,99 +227,68 @@ class KeyedConnectionPool(AbstractConnectionPool):
# see the comments in ConnectionPool for method descriptions. # see the comments in ConnectionPool for method descriptions.
def __init__(self, size, timeout=None): def __init__(self, size, timeout=time()):
super(KeyedConnectionPool, self).__init__(size, timeout) super(KeyedConnectionPool, self).__init__(size, timeout)
# key: {time.time: connection} self.pools = {}
self.available = BTrees.family32.OO.Bucket()
# time.time: key def setSize(self, v):
self.closed = BTrees.family32.OO.Bucket() self._size = v
for pool in self.pools.values():
pool.setSize(v)
def setTimeout(self, v):
self._timeout = v
for pool in self.pools.values():
pool.setTimeout(v)
def push(self, c, key): def push(self, c, key):
assert c not in self.all pool = self.pools.get(key)
available = self.available.get(key) if pool is None:
if available is None: pool = self.pools[key] = ConnectionPool(self.size, self.timeout)
available = self.available[key] = BTrees.family32.OO.Bucket() pool.push(c)
else:
assert c not in available.values()
self._reduce_size(strictly_less=True)
self.all.add(c)
t = time()
available[t] = c
self.closed[t] = key
n = len(self.all)
limit = self.size
if n > limit:
reporter = logger.warn
if n > 2 * limit:
reporter = logger.critical
reporter("DB.open() has %s open connections with a size "
"of %s", n, limit)
def repush(self, c, key): def repush(self, c, key):
assert c in self.all self.pools[key].repush(c)
self._reduce_size(strictly_less=True)
available = self.available.get(key)
if available is None:
available = self.available[key] = BTrees.family32.OO.Bucket()
else:
assert c not in available.values()
t = time()
available[t] = c
self.closed[t] = key
def _reduce_size(self, strictly_less=False): def _reduce_size(self, strictly_less=False):
if self.timeout is None: for key, pool in list(self.pools.items()):
threshhold = None pool._reduce_size(strictly_less)
else: if not pool.all:
threshhold = time() - self.timeout del self.pools[key]
target = self.size
if strictly_less:
target -= 1
for t, key in tuple(self.closed.items()):
if (len(self.available) > target or
threshhold is not None and t < threshhold):
del self.closed[t]
c = self.available[key].pop(t)
if not self.available[key]:
del self.available[key]
self.all.remove(c)
c._resetCache()
else:
break
def reduce_size(self): def reduce_size(self):
self._reduce_size() self._reduce_size()
def pop(self, key): def pop(self, key):
result = None pool = self.pools.get(key)
available = self.available.get(key) if pool is not None:
if available: return pool.pop()
t = available.maxKey()
result = available.pop(t)
del self.closed[t]
if not available:
del self.available[key]
assert result in self.all
return result
def map(self, f): def map(self, f):
self.all.map(f) for pool in self.pools.itervalues():
pool.map(f)
def availableGC(self): def availableGC(self):
if self.timeout is None: for key, pool in self.pools.items():
threshhold = None pool.availableGC()
else: if not pool.all:
threshhold = time() - self.timeout del self.pools[key]
for t, key in tuple(self.closed.items()):
if threshhold is not None and t < threshhold: @property
del self.closed[t] def test_all(self):
c = self.available[key].pop(t) result = set()
if not self.available[key]: for pool in self.pools.itervalues():
del self.available[key] result.update(pool.all)
self.all.remove(c) return frozenset(result)
c._resetCache()
else: @property
self.available[key][t].cacheGC() def test_available(self):
result = []
for pool in self.pools.itervalues():
result.extend(pool.available)
return tuple(result)
def toTimeStamp(dt): def toTimeStamp(dt):
utc_struct = dt.utctimetuple() utc_struct = dt.utctimetuple()
......
...@@ -19,7 +19,9 @@ to be layered over a base database. ...@@ -19,7 +19,9 @@ to be layered over a base database.
The base storage must not change. The base storage must not change.
""" """
import os
import random import random
import weakref
import tempfile import tempfile
import threading import threading
import ZODB.blob import ZODB.blob
...@@ -36,35 +38,41 @@ class DemoStorage(object): ...@@ -36,35 +38,41 @@ class DemoStorage(object):
ZODB.interfaces.IStorageIteration, ZODB.interfaces.IStorageIteration,
) )
def __init__(self, name=None, base=None, changes=None, def __init__(self, name=None, base=None, changes=None):
keep_base_open=False):
self._keep_base_open = keep_base_open
if base is None: if base is None:
base = ZODB.MappingStorage.MappingStorage() base = ZODB.MappingStorage.MappingStorage()
self._temporary_base = True
else:
self._temporary_base = False
self.base = base self.base = base
if changes is None: if changes is None:
changes = ZODB.MappingStorage.MappingStorage() changes = ZODB.MappingStorage.MappingStorage()
zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage) zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage)
self._temporary_changes = True self._temporary_changes = True
self._blob_dir = None
else: else:
if ZODB.interfaces.IBlobStorage.providedBy(changes): if ZODB.interfaces.IBlobStorage.providedBy(changes):
zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage) zope.interface.alsoProvides(self, ZODB.interfaces.IBlobStorage)
self._temporary_changes = False self._temporary_changes = False
self.changes = changes self.changes = changes
if name is None: if name is None:
name = 'DemoStorage(%r, %r)' % (base.getName(), changes.getName()) name = 'DemoStorage(%r, %r)' % (base.getName(), changes.getName())
self.__name__ = name self.__name__ = name
self._copy_methods_from_changes(changes) self._copy_methods_from_changes(changes)
def _blobify(self): def _blobify(self):
if self._temporary_changes and self._blob_dir is None: if (self._temporary_changes and
self._blob_dir = tempfile.mkdtemp('blobs') isinstance(self.changes, ZODB.MappingStorage.MappingStorage)
self.changes = ZODB.blob.BlobStorage(self._blob_dir, self.changes) ):
blob_dir = tempfile.mkdtemp('.demoblobs')
_temporary_blobdirs[
weakref.ref(self, cleanup_temporary_blobdir)
] = blob_dir
self.changes = ZODB.blob.BlobStorage(blob_dir, self.changes)
self._copy_methods_from_changes(self.changes) self._copy_methods_from_changes(self.changes)
return True return True
...@@ -73,11 +81,10 @@ class DemoStorage(object): ...@@ -73,11 +81,10 @@ class DemoStorage(object):
self.changes.cleanup() self.changes.cleanup()
def close(self): def close(self):
if not self._keep_base_open: if not self._temporary_base:
self.base.close() self.base.close()
self.changes.close() if not self._temporary_changes:
if getattr(self, '_blob_dir', ''): self.changes.close()
ZODB.blob.remove_committed_dir(self._blob_dir)
def _copy_methods_from_changes(self, changes): def _copy_methods_from_changes(self, changes):
for meth in ( for meth in (
...@@ -195,6 +202,13 @@ class DemoStorage(object): ...@@ -195,6 +202,13 @@ class DemoStorage(object):
pass # The gc arg isn't supported. Don't pack pass # The gc arg isn't supported. Don't pack
raise raise
def pop(self):
self.changes.close()
return self.base
def push(self, changes=None):
return self.__class__(base=self, changes=changes)
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
assert version=='', "versions aren't supported" assert version=='', "versions aren't supported"
...@@ -231,3 +245,12 @@ class DemoStorage(object): ...@@ -231,3 +245,12 @@ class DemoStorage(object):
if self._blobify(): if self._blobify():
return self.changes.temporaryDirectory() return self.changes.temporaryDirectory()
raise raise
_temporary_blobdirs = {}
def cleanup_temporary_blobdir(
ref,
_temporary_blobdirs=_temporary_blobdirs, # Make sure it stays around
):
blob_dir = _temporary_blobdirs.pop(ref, None)
if blob_dir and os.path.exists(blob_dir):
ZODB.blob.remove_committed_dir(blob_dir)
...@@ -120,37 +120,66 @@ Undo methods are simply copied from the changes storage: ...@@ -120,37 +120,66 @@ Undo methods are simply copied from the changes storage:
... ] ... ]
[True, True, True, True] [True, True, True, True]
>>> db.close()
Normally, when we close a demo storage, the changes and base storages Storage Stacking
are closed: ================
>>> db.close() A common use case is to stack demo storages. DemoStorage provides
>>> base._file.closed some helper functions to help with this. The push method, just
creates a new demo storage who's base is the original demo storage:
>>> demo = DemoStorage()
>>> demo2 = demo.push()
>>> demo2.base is demo
True True
>>> changes._file.closed
We can also supply an explicit changes storage, if we wish:
>>> from ZODB.MappingStorage import MappingStorage
>>> changes = MappingStorage()
>>> demo3 = demo2.push(changes)
>>> demo3.changes is changes, demo3.base is demo2
(True, True)
The pop method closes the changes storage and returns the base
*without* closing it:
>>> demo3.pop() is demo2
True True
A common use case is to stack multiple DemoStorages, returning to a >>> changes.opened()
previous state by popping a DemoStorage off the stack. In this case, False
we want to leave the base storage open:
>>> base = FileStorage('base.fs', read_only=True) Special backward compatibility support
>>> storage = DemoStorage(base=base, keep_base_open=True) --------------------------------------
Here, we didn't specify a changes storage. A MappingStorage was Normally, when a demo storage is closed, it's base and changes
automatically created: storage are closed:
>>> type(storage.changes).__name__ >>> demo = DemoStorage(base=MappingStorage(), changes=MappingStorage())
'MappingStorage' >>> demo.close()
>>> demo.base.opened(), demo.changes.opened()
(False, False)
Because we specified the keep_base_open option, the base storage is Older versions of DemoStorage didn't have a separate changes storage
left open when we close the DemoStorage: and didn't close or discard their changes when they were closed. When
a stack was built solely of demo storages, the close method
effectively did nothing. To maintain backward compatibility, when no
base or changes storage is supplied in the constructor, the underlying
storage created by the demo storage isn't closed by the demo storage.
This backward-compatibility is deprecated.
>>> storage.close() >>> demo = DemoStorage()
>>> base._file.closed >>> demo.close()
False >>> demo.changes.opened(), demo.base.opened()
>>> storage.changes.opened() (True, True)
False
>>> demo = DemoStorage(base=MappingStorage())
>>> demo2 = demo.push()
>>> demo2.close()
>>> demo2.changes.opened(), demo2.base.base.opened()
(True, False)
Blob Support Blob Support
============ ============
...@@ -236,6 +265,65 @@ storage wrapped around it when necessary: ...@@ -236,6 +265,65 @@ storage wrapped around it when necessary:
.. Check that the temporary directory is gone .. Check that the temporary directory is gone
For now, it won't go until the storage does.
>>> transaction.abort()
>>> conn.close()
>>> blobdir = storage.temporaryDirectory()
>>> del db, conn, storage, _
>>> import gc
>>> _ = gc.collect()
>>> import os >>> import os
>>> os.path.exists(storage.temporaryDirectory()) >>> os.path.exists(blobdir)
False False
ZConfig support
===============
You can configure demo storages using ZConfig, using name, changes,
and base options:
>>> import ZODB.config
>>> storage = ZODB.config.storageFromString("""
... <demostorage>
... </demostorage>
... """)
>>> storage.getName()
"DemoStorage('MappingStorage', 'MappingStorage')"
>>> storage = ZODB.config.storageFromString("""
... <demostorage>
... <filestorage base>
... path base.fs
... </filestorage>
...
... <filestorage changes>
... path changes.fs
... </filestorage>
... </demostorage>
... """)
>>> storage.getName()
"DemoStorage('base.fs', 'changes.fs')"
>>> storage.close()
>>> storage = ZODB.config.storageFromString("""
... <demostorage>
... name bob
... <filestorage>
... path base.fs
... </filestorage>
...
... <filestorage changes>
... path changes.fs
... </filestorage>
... </demostorage>
... """)
>>> storage.getName()
'bob'
>>> storage.base.getName()
'base.fs'
>>> storage.close()
...@@ -175,8 +175,7 @@ ...@@ -175,8 +175,7 @@
<sectiontype name="demostorage" datatype=".DemoStorage" <sectiontype name="demostorage" datatype=".DemoStorage"
implements="ZODB.storage"> implements="ZODB.storage">
<key name="name" /> <key name="name" />
<section type="ZODB.storage" name="*" attribute="base" /> <multisection type="ZODB.storage" name="*" attribute="factories" />
<section type="ZODB.storage" name="changes" attribute="changes" />
</sectiontype> </sectiontype>
......
...@@ -120,15 +120,17 @@ class MappingStorage(BaseConfig): ...@@ -120,15 +120,17 @@ class MappingStorage(BaseConfig):
class DemoStorage(BaseConfig): class DemoStorage(BaseConfig):
def open(self): def open(self):
base = changes = None
for factory in self.config.factories:
if factory.name == 'changes':
changes = factory.open()
else:
if base is None:
base = factory.open()
else:
raise ValueError("Too many base storages defined!")
from ZODB.DemoStorage import DemoStorage from ZODB.DemoStorage import DemoStorage
if self.config.base:
base = self.config.base.open()
else:
base = None
if self.config.changes:
changes = self.config.changes.open()
else:
changes = None
return DemoStorage(self.config.name, base=base, changes=changes) return DemoStorage(self.config.name, base=base, changes=changes)
class FileStorage(BaseConfig): class FileStorage(BaseConfig):
......
...@@ -13,11 +13,7 @@ development continues on a "development" head. ...@@ -13,11 +13,7 @@ development continues on a "development" head.
A database can be opened historically ``at`` or ``before`` a given transaction A database can be opened historically ``at`` or ``before`` a given transaction
serial or datetime. Here's a simple example. It should work with any storage serial or datetime. Here's a simple example. It should work with any storage
that supports ``loadBefore``. Unfortunately that does not include that supports ``loadBefore``.
MappingStorage, so we use a FileStorage instance. Also unfortunately, as of
this writing there is no reliable way to determine if a storage truly
implements loadBefore, or if it simply returns None (as in BaseStorage), other
than reading code.
We'll begin our example with a fairly standard set up. We We'll begin our example with a fairly standard set up. We
...@@ -28,11 +24,8 @@ We'll begin our example with a fairly standard set up. We ...@@ -28,11 +24,8 @@ We'll begin our example with a fairly standard set up. We
- modify the database again; and - modify the database again; and
- commit a transaction. - commit a transaction.
>>> import ZODB.FileStorage >>> import ZODB.MappingStorage
>>> storage = ZODB.FileStorage.FileStorage( >>> db = ZODB.MappingStorage.DB()
... 'HistoricalConnectionTests.fs', create=True)
>>> import ZODB
>>> db = ZODB.DB(storage)
>>> conn = db.open() >>> conn = db.open()
>>> import persistent.mapping >>> import persistent.mapping
...@@ -42,14 +35,13 @@ We'll begin our example with a fairly standard set up. We ...@@ -42,14 +35,13 @@ We'll begin our example with a fairly standard set up. We
>>> import transaction >>> import transaction
>>> transaction.commit() >>> transaction.commit()
We wait for some ttime to pass, and then make some other changes. We wait for some time to pass, record he time, and then make some other changes.
>>> import time >>> import time
>>> t = time.time() >>> t = time.time()
>>> while time.time() <= t: >>> while time.time() <= t:
... time.sleep(.001) ... time.sleep(.001)
>>> import datetime >>> import datetime
>>> now = datetime.datetime.utcnow() >>> now = datetime.datetime.utcnow()
...@@ -164,186 +156,80 @@ historical connection should be kept. ...@@ -164,186 +156,80 @@ historical connection should be kept.
>>> db.getHistoricalTimeout() >>> db.getHistoricalTimeout()
400 400
All three of these values can be specified in a ZConfig file. We're using All three of these values can be specified in a ZConfig file.
mapping storage for simplicity, but remember, as we said at the start of this
document, mapping storage will not work for historical connections (and in fact
may seem to work but then fail confusingly) because it does not implement
loadBefore.
>>> import ZODB.config >>> import ZODB.config
>>> db2 = ZODB.config.databaseFromString(''' >>> db2 = ZODB.config.databaseFromString('''
... <zodb> ... <zodb>
... <mappingstorage/> ... <mappingstorage/>
... historical-pool-size 5 ... historical-pool-size 3
... historical-cache-size 1500 ... historical-cache-size 1500
... historical-timeout 6m ... historical-timeout 6m
... </zodb> ... </zodb>
... ''') ... ''')
>>> db2.getHistoricalPoolSize() >>> db2.getHistoricalPoolSize()
5 3
>>> db2.getHistoricalCacheSize() >>> db2.getHistoricalCacheSize()
1500 1500
>>> db2.getHistoricalTimeout() >>> db2.getHistoricalTimeout()
360 360
Let's actually look at these values at work by shining some light into what
has been a black box up to now. We'll actually do some white box examination
of what is going on in the database, pools and connections.
Historical connections are held in a single connection pool with mappings
from the ``before`` TID to available connections. First we'll put a new
pool on the database so we have a clean slate.
>>> historical_conn.close()
>>> from ZODB.DB import KeyedConnectionPool
>>> db.historical_pool = KeyedConnectionPool(
... db.historical_pool.size, db.historical_pool.timeout)
Now lets look what happens to the pool when we create and close an historical The pool lets us reuse connections. To see this, we'll open some
connection. connections, close them, and then open them again:
>>> pool = db.historical_pool >>> conns1 = [db2.open(before=serial) for i in range(4)]
>>> len(pool.all) >>> _ = [c.close() for c in conns1]
0 >>> conns2 = [db2.open(before=serial) for i in range(4)]
>>> len(pool.available)
0
>>> historical_conn = db.open(
... transaction_manager=transaction1, before=serial)
>>> len(pool.all)
1
>>> len(pool.available)
0
>>> historical_conn in pool.all
True
>>> historical_conn.close()
>>> len(pool.all)
1
>>> len(pool.available)
1
>>> pool.available.keys()[0] == serial
True
>>> len(pool.available.values()[0])
1
Now we'll open and close two for the same serial to see what happens to the Now let's look at what we got. The first connection in conns 2 is the
data structures. last connection in conns1, because it was the last connection closed.
>>> historical_conn is db.open( >>> conns2[0] is conns1[-1]
... transaction_manager=transaction1, before=serial)
True True
>>> len(pool.all)
1
>>> len(pool.available)
0
>>> transaction2 = transaction.TransactionManager()
>>> historical_conn2 = db.open(
... transaction_manager=transaction2, before=serial)
>>> len(pool.all)
2
>>> len(pool.available)
0
>>> historical_conn2.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
1
>>> historical_conn.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
2
If you change the historical cache size, that changes the size of the Also for the next two:
persistent cache on our connection.
>>> historical_conn._cache.cache_size >>> (conns2[1] is conns1[-2]), (conns2[2] is conns1[-3])
2000 (True, True)
>>> db.setHistoricalCacheSize(1500)
>>> historical_conn._cache.cache_size
1500
Now let's look at pool sizes. We'll set it to two, then open and close three But not for the last:
connections. We should end up with only two available connections.
>>> db.setHistoricalPoolSize(2) >>> conns2[3] is conns1[-4]
False
>>> historical_conn = db.open( Because the pool size was set to 3.
... transaction_manager=transaction1, before=serial)
>>> historical_conn2 = db.open(
... transaction_manager=transaction2, before=serial)
>>> transaction3 = transaction.TransactionManager()
>>> historical_conn3 = db.open(
... transaction_manager=transaction3, at=historical_serial)
>>> len(pool.all)
3
>>> len(pool.available)
0
>>> historical_conn3.close() Connections are also discarded if they haven't been used in a while.
>>> len(pool.all) To see this, let's close two of the connections:
3
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
1
>>> historical_conn2.close() >>> conns2[0].close(); conns2[1].close()
>>> len(pool.all)
3
>>> len(pool.available)
2
>>> len(pool.available.values()[0])
1
>>> len(pool.available.values()[1])
1
>>> historical_conn.close() We'l also set the historical timeout to be very low:
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
2
Notice it dumped the one that was closed at the earliest time. >>> db2.setHistoricalTimeout(.01)
>>> time.sleep(.1)
>>> conns2[2].close(); conns2[3].close()
Finally, we'll look at the timeout. We'll need to monkeypatch ``time`` for Now, when we open 4 connections:
this. (The funky __import__ of DB is because some ZODB __init__ shenanigans
make the DB class mask the DB module.)
>>> db.getHistoricalTimeout() >>> conns1 = [db2.open(before=serial) for i in range(4)]
400
>>> import time
>>> delta = 200
>>> def stub_time():
... return time.time() + delta
...
>>> DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
>>> original_time = DB_module.time
>>> DB_module.time = stub_time
>>> historical_conn = db.open(before=serial) We'll see that only the last 2 connections from conn2 are in the
result:
>>> len(pool.all) >>> [c in conns1 for c in conns2]
2 [False, False, True, True]
>>> len(pool.available)
1
A close or an open will do garbage collection on the timed out connections.
>>> delta += 200 If you change the historical cache size, that changes the size of the
>>> historical_conn.close() persistent cache on our connection.
>>> len(pool.all) >>> historical_conn._cache.cache_size
1 2000
>>> len(pool.available) >>> db.setHistoricalCacheSize(1500)
1 >>> historical_conn._cache.cache_size
>>> len(pool.available.values()[0]) 1500
1
Invalidations Invalidations
============= =============
......
...@@ -239,12 +239,12 @@ Closing connections adds them to the stack: ...@@ -239,12 +239,12 @@ Closing connections adds them to the stack:
Closing another one will purge the one with MARKER 0 from the stack Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack): (since it was the first added to the stack):
>>> [c.MARKER for c in pool.available.values()] >>> [c.MARKER for (t, c) in pool.available]
[0, 1, 2] [0, 1, 2]
>>> conns[0].close() # MARKER 3 >>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all) >>> len(pool.available), len(pool.all)
(3, 5) (3, 5)
>>> [c.MARKER for c in pool.available.values()] >>> [c.MARKER for (t, c) in pool.available]
[1, 2, 3] [1, 2, 3]
Similarly for the other two: Similarly for the other two:
...@@ -252,7 +252,7 @@ Similarly for the other two: ...@@ -252,7 +252,7 @@ Similarly for the other two:
>>> conns[1].close(); conns[2].close() >>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all) >>> len(pool.available), len(pool.all)
(3, 3) (3, 3)
>>> [c.MARKER for c in pool.available.values()] >>> [c.MARKER for (t, c) in pool.available]
[3, 4, 5] [3, 4, 5]
Reducing the pool size may also purge the oldest closed connections: Reducing the pool size may also purge the oldest closed connections:
...@@ -260,7 +260,7 @@ Reducing the pool size may also purge the oldest closed connections: ...@@ -260,7 +260,7 @@ Reducing the pool size may also purge the oldest closed connections:
>>> db.setPoolSize(2) # gets rid of MARKER 3 >>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all) >>> len(pool.available), len(pool.all)
(2, 2) (2, 2)
>>> [c.MARKER for c in pool.available.values()] >>> [c.MARKER for (t, c) in pool.available]
[4, 5] [4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the Since MARKER 5 is still the last one added to the stack, it will be the
......
...@@ -25,10 +25,7 @@ def setUp(test): ...@@ -25,10 +25,7 @@ def setUp(test):
def tearDown(test): def tearDown(test):
test.globs['db'].close() test.globs['db'].close()
test.globs['db2'].close() test.globs['db2'].close()
test.globs['storage'].close()
# the DB class masks the module because of __init__ shenanigans # the DB class masks the module because of __init__ shenanigans
DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
DB_module.time = test.globs['original_time']
module.tearDown(test) module.tearDown(test)
ZODB.tests.util.tearDown(test) ZODB.tests.util.tearDown(test)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment