Commit f4d83af9 authored by Gary Poster's avatar Gary Poster

incrementally remove some version support (this time from connection and DB);...

incrementally remove some version support (this time from connection and DB); add support for historical read-only connections.
parent 4e92bbdc
......@@ -28,6 +28,14 @@ General
using `int` for memory sizes which caused errors on x86_64 Intel Xeon
machines (using 64-bit Linux).
- (unreleased, after 3.9.0a1) Removed version support from connections and
DB. Versions are still in the storages; this is an incremental step.
- (unreleased, after 3.9.0a1) Added support for read-only, historical
connections based on datetimes or serials (TIDs). See
src/ZODB/historical_connections.txt.
ZEO
---
......
......@@ -46,7 +46,7 @@ class MappingStorageConfig:
def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""
class FileStorageConnectionTests(
FileStorageConfig,
ConnectionTests.ConnectionTests,
......
......@@ -4,7 +4,7 @@ ZEO Fan Out
We should be able to set up ZEO servers with ZEO clients. Let's see
if we can make it work.
We'll use some helper functions. The first is a helpter that starts
We'll use some helper functions. The first is a helper that starts
ZEO servers for us and another one that picks ports.
We'll start the first server:
......@@ -16,7 +16,7 @@ We'll start the first server:
... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
Then we''ll start 2 others that use this one:
Then we'll start 2 others that use this one:
>>> port1 = ZEO.tests.testZEO.get_port()
>>> zconf1 = ZEO.tests.forker.ZEOConfig(('', port1))
......
......@@ -44,7 +44,7 @@ from ZODB.ExportImport import ExportImport
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.POSException import Unsupported
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
......@@ -79,17 +79,20 @@ class Connection(ExportImport, object):
##########################################################################
# Connection methods, ZODB.IConnection
def __init__(self, db, version='', cache_size=400):
def __init__(self, db, cache_size=400, before=None):
"""Create a new Connection."""
self._log = logging.getLogger('ZODB.Connection')
self._debug_info = ()
self._db = db
# historical connection
self.before = before
# Multi-database support
self.connections = {self._db.database_name: self}
self._version = version
self._normal_storage = self._storage = db._storage
self.new_oid = db._storage.new_oid
self._savepoint_storage = None
......@@ -112,13 +115,6 @@ class Connection(ExportImport, object):
# objects immediately load their state whern they get their
# persistent data set.
self._pre_cache = {}
if version:
# Caches for versions end up empty if the version
# is not used for a while. Non-version caches
# keep their content indefinitely.
# Unclear: Why do we want version caches to behave this way?
self._cache.cache_drain_resistance = 100
# List of all objects (not oids) registered as modified by the
# persistence machinery, or by add(), or whose access caused a
......@@ -186,8 +182,6 @@ class Connection(ExportImport, object):
# the upper bound on transactions visible to this connection.
# That is, all object revisions must be written before _txn_time.
# If it is None, then the current revisions are acceptable.
# If the connection is in a version, mvcc will be disabled, because
# loadBefore() only returns non-version data.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
......@@ -240,8 +234,11 @@ class Connection(ExportImport, object):
# This appears to be an MVCC violation because we are loading
# the must recent data when perhaps we shouldnt. The key is
# that we are only creating a ghost!
p, serial = self._storage.load(oid, self._version)
# that we are only creating a ghost!
# A disadvantage to this optimization is that _p_serial cannot be
# trusted until the object has been loaded, which affects both MVCC
# and historical connections.
p, serial = self._storage.load(oid, '')
obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before
......@@ -318,13 +315,17 @@ class Connection(ExportImport, object):
return self._db
def isReadOnly(self):
"""Returns True if the storage for this connection is read only."""
"""Returns True if this connection is read only."""
if self._opened is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
return self.before is not None or self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self.before is not None and tid > self.before:
# this is an historical connection, and the tid is after the
# freeze. Invalidations are irrelevant.
return
self._inv_lock.acquire()
try:
if self._txn_time is None:
......@@ -339,25 +340,18 @@ class Connection(ExportImport, object):
self._invalidatedCache = True
finally:
self._inv_lock.release()
def root(self):
"""Return the database root object."""
return self.get(z64)
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open(
transaction_manager=self.transaction_manager,
version=self._version,
before=self.before,
)
self.connections.update(new_con.connections)
new_con.connections = self.connections
......@@ -539,6 +533,9 @@ class Connection(ExportImport, object):
def _commit(self, transaction):
"""Commit changes to an object"""
if self.before is not None:
raise ReadOnlyHistoryError()
if self._import:
# We are importing an export file. We alsways do this
......@@ -618,15 +615,14 @@ class Connection(ExportImport, object):
raise ValueError("Can't commit with opened blobs.")
s = self._storage.storeBlob(oid, serial, p,
obj._uncommitted(),
self._version, transaction)
'', transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, self._version,
transaction)
s = self._storage.store(oid, serial, p, '', transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
......@@ -825,37 +821,63 @@ class Connection(ExportImport, object):
# the code if we could drop support for it.
# (BTrees.Length does.)
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
if self.before is not None:
# Load data that was current before the time we have.
if self._txn_time is not None: # MVCC for readonly future conn.
before = self._txn_time
has_invalidated = True
else:
before = self.before
has_invalidated = False
t = self._storage.loadBefore(obj._p_oid, before)
if t is None:
raise POSKeyError()
p, serial, end = t
if not has_invalidated and end is None:
# MVCC: make sure another thread has not beaten us to the punch
self._inv_lock.acquire()
try:
txn_time = self._txn_time
finally:
self._inv_lock.release()
if txn_time is not None and txn_time < before:
t = self._storage.loadBefore(obj._p_oid, txn_time)
if t is None:
raise POSKeyError()
p, serial, end = t
else:
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, '')
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
......@@ -867,7 +889,7 @@ class Connection(ExportImport, object):
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not ((not self._version) and self._setstate_noncurrent(obj)):
if not self._setstate_noncurrent(obj):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
......@@ -1029,11 +1051,7 @@ class Connection(ExportImport, object):
# Python protocol
def __repr__(self):
if self._version:
ver = ' (in version %s)' % `self._version`
else:
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
return '<Connection at %08x>' % (positive_id(self),)
# Python protocol
##########################################################################
......@@ -1043,17 +1061,6 @@ class Connection(ExportImport, object):
__getitem__ = get
def modifiedInVersion(self, oid):
"""Returns the version the object with the given oid was modified in.
If it wasn't modified in a version, the current version of this
connection is returned.
"""
try:
return self._db.modifiedInVersion(oid)
except KeyError:
return self.getVersion()
def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid
......@@ -1079,7 +1086,7 @@ class Connection(ExportImport, object):
def savepoint(self):
if self._savepoint_storage is None:
tmpstore = TmpStore(self._version, self._normal_storage)
tmpstore = TmpStore(self._normal_storage)
self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage
......@@ -1124,7 +1131,7 @@ class Connection(ExportImport, object):
if isinstance(self._reader.getGhost(data), Blob):
blobfilename = src.loadBlob(oid, serial)
s = self._storage.storeBlob(oid, serial, data, blobfilename,
self._version, transaction)
'', transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
......@@ -1132,7 +1139,7 @@ class Connection(ExportImport, object):
self.invalidate(s, {oid:True})
else:
s = self._storage.store(oid, serial, data,
self._version, transaction)
'', transaction)
self._handle_serial(s, oid, change=False)
src.close()
......@@ -1182,23 +1189,13 @@ class TmpStore:
implements(IBlobStorage)
def __init__(self, base_version, storage):
def __init__(self, storage):
self._storage = storage
for method in (
'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
):
setattr(self, method, getattr(storage, method))
try:
supportsVersions = storage.supportsVersions
except AttributeError:
pass
else:
if supportsVersions():
self.modifiedInVersion = storage.modifiedInVersion
self.versionEmpty = storage.versionEmpty
self._base_version = base_version
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
......@@ -1216,7 +1213,7 @@ class TmpStore:
def load(self, oid, version):
pos = self.index.get(oid)
if pos is None:
return self._storage.load(oid, self._base_version)
return self._storage.load(oid, '')
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
......@@ -1231,7 +1228,7 @@ class TmpStore:
def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit
# commit logic
assert version == self._base_version
assert version == ''
self._file.seek(self.position)
l = len(data)
if serial is None:
......@@ -1245,7 +1242,8 @@ class TmpStore:
def storeBlob(self, oid, serial, data, blobfilename, version,
transaction):
serial = self.store(oid, serial, data, version, transaction)
assert version == ''
serial = self.store(oid, serial, data, '', transaction)
targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath):
......
......@@ -21,6 +21,8 @@ import cPickle, cStringIO, sys
import threading
from time import time, ctime
import logging
import datetime
import calendar
from ZODB.broken import find_global
from ZODB.utils import z64
......@@ -31,8 +33,11 @@ from ZODB.utils import WeakSet
from zope.interface import implements
from ZODB.interfaces import IDatabase
import BTrees.OOBTree
import transaction
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZODB.DB')
......@@ -62,10 +67,14 @@ class _ConnectionPool(object):
connectionDebugInfo() can still gather statistics.
"""
def __init__(self, pool_size):
def __init__(self, pool_size, timeout=None):
# The largest # of connections we expect to see alive simultaneously.
self.pool_size = pool_size
# The minimum number of seconds that an available connection should
# be kept, or None.
self.timeout = timeout
# A weak set of all connections we've seen. A connection vanishes
# from this set if pop() hands it out, it's not reregistered via
# repush(), and it becomes unreachable.
......@@ -75,10 +84,9 @@ class _ConnectionPool(object):
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than pool_size entries
# in this stack.
# In Python 2.4, a collections.deque would make more sense than
# a list (we push only "on the right", but may pop from both ends).
self.available = []
# in this stack. The keys are time.time() values of the push or
# repush calls.
self.available = BTrees.OOBTree.Bucket()
def set_pool_size(self, pool_size):
"""Change our belief about the expected maximum # of live connections.
......@@ -89,6 +97,13 @@ class _ConnectionPool(object):
self.pool_size = pool_size
self._reduce_size()
def set_timeout(self, timeout):
old = self.timeout
self.timeout = timeout
if timeout is not None and old != timeout and (
old is None or old > timeout):
self._reduce_size()
def push(self, c):
"""Register a new available connection.
......@@ -96,10 +111,10 @@ class _ConnectionPool(object):
stack even if we're over the pool size limit.
"""
assert c not in self.all
assert c not in self.available
assert c not in self.available.values()
self._reduce_size(strictly_less=True)
self.all.add(c)
self.available.append(c)
self.available[time()] = c
n = len(self.all)
limit = self.pool_size
if n > limit:
......@@ -116,34 +131,46 @@ class _ConnectionPool(object):
older available connections.
"""
assert c in self.all
assert c not in self.available
assert c not in self.available.values()
self._reduce_size(strictly_less=True)
self.available.append(c)
self.available[time()] = c
def _reduce_size(self, strictly_less=False):
"""Throw away the oldest available connections until we're under our
target size (strictly_less=False, the default) or no more than that
(strictly_less=True).
"""
if self.timeout is None:
threshhold = None
else:
threshhold = time() - self.timeout
target = self.pool_size
if strictly_less:
target -= 1
while len(self.available) > target:
c = self.available.pop(0)
self.all.remove(c)
# While application code may still hold a reference to `c`,
# there's little useful that can be done with this Connection
# anymore. Its cache may be holding on to limited resources,
# and we replace the cache with an empty one now so that we
# don't have to wait for gc to reclaim it. Note that it's not
# possible for DB.open() to return `c` again: `c` can never
# be in an open state again.
# TODO: Perhaps it would be better to break the reference
# cycles between `c` and `c._cache`, so that refcounting reclaims
# both right now. But if user code _does_ have a strong
# reference to `c` now, breaking the cycle would not reclaim `c`
# now, and `c` would be left in a user-visible crazy state.
c._resetCache()
for t, c in list(self.available.items()):
if (len(self.available) > target or
threshhold is not None and t < threshhold):
del self.available[t]
self.all.remove(c)
# While application code may still hold a reference to `c`,
# there's little useful that can be done with this Connection
# anymore. Its cache may be holding on to limited resources,
# and we replace the cache with an empty one now so that we
# don't have to wait for gc to reclaim it. Note that it's not
# possible for DB.open() to return `c` again: `c` can never be
# in an open state again.
# TODO: Perhaps it would be better to break the reference
# cycles between `c` and `c._cache`, so that refcounting
# reclaims both right now. But if user code _does_ have a
# strong reference to `c` now, breaking the cycle would not
# reclaim `c` now, and `c` would be left in a user-visible
# crazy state.
c._resetCache()
else:
break
def reduce_size(self):
self._reduce_size()
def pop(self):
"""Pop an available connection and return it.
......@@ -154,23 +181,56 @@ class _ConnectionPool(object):
"""
result = None
if self.available:
result = self.available.pop()
result = self.available.pop(self.available.maxKey())
# Leave it in self.all, so we can still get at it for statistics
# while it's alive.
assert result in self.all
return result
def map(self, f, open_connections=True):
"""For every live connection c, invoke f(c).
If `open_connections` is false then only call f(c) on closed
connections.
"""
if open_connections:
self.all.map(f)
def map(self, f):
"""For every live connection c, invoke f(c)."""
self.all.map(f)
def availableGC(self):
"""Perform garbage collection on available connections.
If a connection is no longer viable because it has timed out, it is
garbage collected."""
if self.timeout is None:
threshhold = None
else:
threshhold = time() - self.timeout
for t, c in tuple(self.available.items()):
if threshhold is not None and t < threshhold:
del self.available[t]
self.all.remove(c)
c._resetCache()
else:
c.cacheGC()
def toTimeStamp(dt):
utc_struct = dt.utctimetuple()
# if this is a leapsecond, this will probably fail. That may be a good
# thing: leapseconds are not really accounted for with serials.
args = utc_struct[:5]+(utc_struct[5] + dt.microsecond/1000000.0,)
return TimeStamp(*args)
def getTID(at, before):
if at is not None:
if before is not None:
raise ValueError('can only pass zero or one of `at` and `before`')
if isinstance(at, datetime.datetime):
at = toTimeStamp(at)
else:
map(f, self.available)
at = TimeStamp(at)
before = repr(at.laterThan(at))
elif before is not None:
if isinstance(before, datetime.datetime):
before = repr(toTimeStamp(before))
else:
before = repr(TimeStamp(before))
return before
class DB(object):
"""The Object Database
......@@ -202,27 +262,27 @@ class DB(object):
- `User Methods`: __init__, open, close, undo, pack, classFactory
- `Inspection Methods`: getName, getSize, objectCount,
getActivityMonitor, setActivityMonitor
- `Connection Pool Methods`: getPoolSize, getVersionPoolSize,
removeVersionPool, setPoolSize, setVersionPoolSize
- `Connection Pool Methods`: getPoolSize, getHistoricalPoolSize,
removeHistoricalPool, setPoolSize, setHistoricalPoolSize,
getHistoricalTimeout, setHistoricalTimeout
- `Transaction Methods`: invalidate
- `Other Methods`: lastTransaction, connectionDebugInfo
- `Version Methods`: modifiedInVersion, abortVersion, commitVersion,
versionEmpty
- `Cache Inspection Methods`: cacheDetail, cacheExtremeDetail,
cacheFullSweep, cacheLastGCTime, cacheMinimize, cacheSize,
cacheDetailSize, getCacheSize, getVersionCacheSize, setCacheSize,
setVersionCacheSize
cacheDetailSize, getCacheSize, getHistoricalCacheSize, setCacheSize,
setHistoricalCacheSize
"""
implements(IDatabase)
klass = Connection # Class to use for connections
_activity_monitor = None
_activity_monitor = next = previous = None
def __init__(self, storage,
pool_size=7,
cache_size=400,
version_pool_size=3,
version_cache_size=100,
historical_pool_size=3,
historical_cache_size=1000,
historical_timeout=300,
database_name='unnamed',
databases=None,
):
......@@ -232,10 +292,12 @@ class DB(object):
- `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache
- `version_pool_size`: expected maximum number of connections (per
version)
- `version_cache_size`: target size of Connection object cache for
version connections
- `historical_pool_size`: expected maximum number of connections (per
historical, or transaction, identifier)
- `historical_cache_size`: target size of Connection object cache for
historical (`at` or `before`) connections
- `historical_timeout`: minimum number of seconds that
an unused historical connection will be kept, or None.
"""
# Allocate lock.
x = threading.RLock()
......@@ -243,12 +305,13 @@ class DB(object):
self._r = x.release
# Setup connection pools and cache info
# _pools maps a version string to a _ConnectionPool object.
# _pools maps a tid identifier, or '', to a _ConnectionPool object.
self._pools = {}
self._pool_size = pool_size
self._cache_size = cache_size
self._version_pool_size = version_pool_size
self._version_cache_size = version_cache_size
self._historical_pool_size = historical_pool_size
self._historical_cache_size = historical_cache_size
self._historical_timeout = historical_timeout
# Setup storage
self._storage=storage
......@@ -296,7 +359,6 @@ class DB(object):
databases[database_name] = self
self._setupUndoMethods()
self._setupVersionMethods()
self.history = storage.history
def _setupUndoMethods(self):
......@@ -316,25 +378,6 @@ class DB(object):
raise NotImplementedError
self.undo = undo
def _setupVersionMethods(self):
storage = self._storage
try:
self.supportsVersions = storage.supportsVersions
except AttributeError:
self.supportsVersions = lambda : False
if self.supportsVersions():
self.versionEmpty = storage.versionEmpty
self.versions = storage.versions
self.modifiedInVersion = storage.modifiedInVersion
else:
self.versionEmpty = lambda version: True
self.versions = lambda max=None: ()
self.modifiedInVersion = lambda oid: ''
def commitVersion(*a, **k):
raise NotImplementedError
self.commitVersion = self.abortVersion = commitVersion
# This is called by Connection.close().
def _returnToPool(self, connection):
"""Return a connection to the pool.
......@@ -351,11 +394,11 @@ class DB(object):
if am is not None:
am.closedConnection(connection)
version = connection._version
before = connection.before or ''
try:
pool = self._pools[version]
pool = self._pools[before]
except KeyError:
# No such version. We must have deleted the pool.
# No such tid. We must have deleted the pool.
# Just let the connection go.
# We need to break circular refs to make it really go.
......@@ -368,29 +411,16 @@ class DB(object):
finally:
self._r()
def _connectionMap(self, f, open_connections=True):
"""Call f(c) for all connections c in all pools in all versions.
If `open_connections` is false then f(c) is only called on closed
connections.
def _connectionMap(self, f):
"""Call f(c) for all connections c in all pools, live and historical.
"""
self._a()
try:
for pool in self._pools.values():
pool.map(f, open_connections=open_connections)
pool.map(f)
finally:
self._r()
def abortVersion(self, version, txn=None):
warnings.warn(
"Versions are deprecated and will become unsupported "
"in ZODB 3.9",
DeprecationWarning, 2)
if txn is None:
txn = transaction.get()
txn.register(AbortVersion(self, version))
def cacheDetail(self):
"""Return information on objects in the various caches
......@@ -503,15 +533,6 @@ class DB(object):
"""
self._storage.close()
def commitVersion(self, source, destination='', txn=None):
warnings.warn(
"Versions are deprecated and will become unsupported "
"in ZODB 3.9",
DeprecationWarning, 2)
if txn is None:
txn = transaction.get()
txn.register(CommitVersion(self, source, destination))
def getCacheSize(self):
return self._cache_size
......@@ -527,19 +548,14 @@ class DB(object):
def getSize(self):
return self._storage.getSize()
def getVersionCacheSize(self):
warnings.warn(
"Versions are deprecated and will become unsupported "
"in ZODB 3.9",
DeprecationWarning, 2)
return self._version_cache_size
def getHistoricalCacheSize(self):
return self._historical_cache_size
def getHistoricalPoolSize(self):
return self._historical_pool_size
def getVersionPoolSize(self):
warnings.warn(
"Versions are deprecated and will become unsupported "
"in ZODB 3.9",
DeprecationWarning, 2)
return self._version_pool_size
def getHistoricalTimeout(self):
return self._historical_timeout
def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
......@@ -549,13 +565,11 @@ class DB(object):
passed in to prevent useless (but harmless) messages to the
connection.
"""
if connection is not None:
version = connection._version
# Storages, esp. ZEO tests, need the version argument still. :-/
assert version==''
# Notify connections.
def inval(c):
if (c is not connection and
(not version or c._version == version)):
if c is not connection:
c.invalidate(tid, oids)
self._connectionMap(inval)
......@@ -567,52 +581,51 @@ class DB(object):
def objectCount(self):
return len(self._storage)
def open(self, version='', transaction_manager=None):
def open(self, transaction_manager=None, at=None, before=None):
"""Return a database Connection for use by application code.
The optional `version` argument can be used to specify that a
version connection is desired.
Note that the connection pool is managed as a stack, to
increase the likelihood that the connection's stack will
include useful objects.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `transaction_manager`: transaction manager to use. None means
use the default transaction manager.
use the default transaction manager.
- `at`: a datetime.datetime or 8 character transaction id of the
time to open the database with a read-only connection. Passing
both `at` and `before` raises a ValueError, and passing neither
opens a standard writable transaction of the newest state.
A timezone-naive datetime.datetime is treated as a UTC value.
- `before`: like `at`, but opens the readonly state before the
tid or datetime.
"""
if version:
if not self.supportsVersions():
raise ValueError(
"Versions are not supported by this database.")
warnings.warn(
"Versions are deprecated and will become unsupported "
"in ZODB 3.9",
DeprecationWarning, 2)
# `at` is normalized to `before`, since we use storage.loadBefore
# as the underlying implementation of both.
before = getTID(at, before)
self._a()
try:
# pool <- the _ConnectionPool for this version
pool = self._pools.get(version)
# pool <- the _ConnectionPool for this `before` tid
pool = self._pools.get(before or '')
if pool is None:
if version:
size = self._version_pool_size
if before is not None:
size = self._historical_pool_size
timeout = self._historical_timeout
else:
size = self._pool_size
self._pools[version] = pool = _ConnectionPool(size)
timeout = None
self._pools[before or ''] = pool = _ConnectionPool(
size, timeout)
assert pool is not None
# result <- a connection
result = pool.pop()
if result is None:
if version:
size = self._version_cache_size
if before is not None:
size = self._historical_cache_size
else:
size = self._cache_size
c = self.klass(self, version, size)
c = self.klass(self, size, before)
pool.push(c)
result = pool.pop()
assert result is not None
......@@ -621,16 +634,23 @@ class DB(object):
result.open(transaction_manager)
# A good time to do some cache cleanup.
self._connectionMap(lambda c: c.cacheGC(), open_connections=False)
# (note we already have the lock)
for key, pool in tuple(self._pools.items()):
pool.availableGC()
if not len(pool.available) and not len(pool.all):
del self._pools[key]
return result
finally:
self._r()
def removeVersionPool(self, version):
def removeHistoricalPool(self, at=None, before=None):
if at is None and before is None:
raise ValueError('must pass one of `at` or `before`')
before = getTID(at, before)
try:
del self._pools[version]
del self._pools[before]
except KeyError:
pass
......@@ -639,7 +659,7 @@ class DB(object):
t = time()
def get_info(c):
# `result`, `time` and `version` are lexically inherited.
# `result`, `time` and `before` are lexically inherited.
o = c._opened
d = c.getDebugInfo()
if d:
......@@ -652,10 +672,10 @@ class DB(object):
result.append({
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'info': d,
'version': version,
'before': before,
})
for version, pool in self._pools.items():
for before, pool in self._pools.items():
pool.map(get_info)
return result
......@@ -705,43 +725,47 @@ class DB(object):
finally:
self._r()
def setVersionCacheSize(self, size):
warnings.warn(
"Versions are deprecated and will become unsupported "
"in ZODB 3.9",
DeprecationWarning, 2)
def setHistoricalCacheSize(self, size):
self._a()
try:
self._version_cache_size = size
self._historical_cache_size = size
def setsize(c):
c._cache.cache_size = size
for version, pool in self._pools.items():
if version:
for tid, pool in self._pools.items():
if tid:
pool.map(setsize)
finally:
self._r()
def setPoolSize(self, size):
self._pool_size = size
self._reset_pool_sizes(size, for_versions=False)
self._reset_pool_sizes(size, for_historical=False)
def setVersionPoolSize(self, size):
warnings.warn(
"Versions are deprecated and will become unsupported "
"in ZODB 3.9",
DeprecationWarning, 2)
self._version_pool_size = size
self._reset_pool_sizes(size, for_versions=True)
def setHistoricalPoolSize(self, size):
self._historical_pool_size = size
self._reset_pool_sizes(size, for_historical=True)
def _reset_pool_sizes(self, size, for_versions=False):
def _reset_pool_sizes(self, size, for_historical=False):
self._a()
try:
for version, pool in self._pools.items():
if (version != '') == for_versions:
for tid, pool in self._pools.items():
if (tid != '') == for_historical:
pool.set_pool_size(size)
finally:
self._r()
def setHistoricalTimeout(self, timeout):
self._historical_timeout = timeout
self._a()
try:
for tid, pool in tuple(self._pools.items()):
if tid:
pool.set_timeout(timeout)
if not pool.available and not pool.all:
del self._pools[tid]
finally:
self._r()
def undo(self, id, txn=None):
"""Undo a transaction identified by id.
......@@ -768,7 +792,7 @@ resource_counter_lock = threading.Lock()
resource_counter = 0
class ResourceManager(object):
"""Transaction participation for a version or undo resource."""
"""Transaction participation for an undo resource."""
# XXX This implementation is broken. Subclasses invalidate oids
# in their commit calls. Invalidations should not be sent until
......@@ -811,39 +835,6 @@ class ResourceManager(object):
def commit(self, obj, txn):
raise NotImplementedError
class CommitVersion(ResourceManager):
def __init__(self, db, version, dest=''):
super(CommitVersion, self).__init__(db)
self._version = version
self._dest = dest
def commit(self, ob, t):
# XXX see XXX in ResourceManager
dest = self._dest
tid, oids = self._db._storage.commitVersion(self._version,
self._dest,
t)
oids = dict.fromkeys(oids, 1)
self._db.invalidate(tid, oids, version=self._dest)
if self._dest:
# the code above just invalidated the dest version.
# now we need to invalidate the source!
self._db.invalidate(tid, oids, version=self._version)
class AbortVersion(ResourceManager):
def __init__(self, db, version):
super(AbortVersion, self).__init__(db)
self._version = version
def commit(self, ob, t):
# XXX see XXX in ResourceManager
tid, oids = self._db._storage.abortVersion(self._version, t)
self._db.invalidate(tid,
dict.fromkeys(oids, 1),
version=self._version)
class TransactionalUndo(ResourceManager):
def __init__(self, db, tid):
......
......@@ -47,7 +47,7 @@ class ExportImport:
continue
done_oids[oid] = True
try:
p, serial = load(oid, self._version)
p, serial = load(oid, '')
except:
logger.debug("broken reference for oid %s", repr(oid),
exc_info=True)
......@@ -55,16 +55,16 @@ class ExportImport:
referencesf(p, oids)
f.writelines([oid, p64(len(p)), p])
if supports_blobs:
if not isinstance(self._reader.getGhost(p), Blob):
continue # not a blob
blobfilename = self._storage.loadBlob(oid, serial)
f.write(blob_begin_marker)
f.write(p64(os.stat(blobfilename).st_size))
blobdata = open(blobfilename, "rb")
cp(blobdata, f)
blobdata.close()
if supports_blobs:
if not isinstance(self._reader.getGhost(p), Blob):
continue # not a blob
blobfilename = self._storage.loadBlob(oid, serial)
f.write(blob_begin_marker)
f.write(p64(os.stat(blobfilename).st_size))
blobdata = open(blobfilename, "rb")
cp(blobdata, f)
blobdata.close()
f.write(export_end_marker)
return f
......@@ -127,8 +127,6 @@ class ExportImport:
return Ghost(oid)
version = self._version
while 1:
header = f.read(16)
if header == export_end_marker:
......@@ -180,9 +178,9 @@ class ExportImport:
if blob_filename is not None:
self._storage.storeBlob(oid, None, data, blob_filename,
version, transaction)
'', transaction)
else:
self._storage.store(oid, None, data, version, transaction)
self._storage.store(oid, None, data, '', transaction)
export_end_marker = '\377'*16
......
......@@ -234,6 +234,10 @@ class DanglingReferenceError(TransactionError):
return "from %s to %s" % (oid_repr(self.referer),
oid_repr(self.missing))
############################################################################
# Only used in storages; versions are no longer supported.
class VersionError(POSError):
"""An error in handling versions occurred."""
......@@ -246,6 +250,7 @@ class VersionLockError(VersionError, TransactionError):
An attempt was made to modify an object that has been modified in an
unsaved version.
"""
############################################################################
class UndoError(POSError):
"""An attempt was made to undo a non-undoable transaction."""
......@@ -292,6 +297,9 @@ class ExportError(POSError):
class Unsupported(POSError):
"""A feature was used that is not supported by the storage."""
class ReadOnlyHistoryError(POSError):
"""Unable to add or modify objects in an historical connection."""
class InvalidObjectReference(POSError):
"""An object contains an invalid reference to another object.
......
......@@ -180,16 +180,22 @@
and exceeding twice pool-size connections causes a critical
message to be logged.
</description>
<key name="version-pool-size" datatype="integer" default="3"/>
<key name="historical-pool-size" datatype="integer" default="3"/>
<description>
The expected maximum number of connections simultaneously open
per version.
per historical revision.
</description>
<key name="version-cache-size" datatype="integer" default="100"/>
<key name="historical-cache-size" datatype="integer" default="1000"/>
<description>
Target size, in number of objects, of each version connection's
Target size, in number of objects, of each historical connection's
object cache.
</description>
<key name="historical-timeout" datatype="time-interval"
default="5m"/>
<description>
The minimum interval that an unused historical connection should be
kept.
</description>
<key name="database-name" default="unnamed"/>
<description>
When multidatabases are in use, this is the name given to this
......
......@@ -68,7 +68,6 @@ def storageFromURL(url):
def storageFromConfig(section):
return section.open()
class BaseConfig:
"""Object representing a configured storage or database.
......@@ -99,8 +98,9 @@ class ZODBDatabase(BaseConfig):
return ZODB.DB(storage,
pool_size=section.pool_size,
cache_size=section.cache_size,
version_pool_size=section.version_pool_size,
version_cache_size=section.version_cache_size,
historical_pool_size=section.historical_pool_size,
historical_cache_size=section.historical_cache_size,
historical_timeout=section.historical_timeout,
database_name=section.database_name,
databases=databases)
except:
......
======================
Historical Connections
======================
Usage
=====
A database can be opened with a read-only, historical connection when given
a specific transaction or datetime. This can enable full-context application
level conflict resolution, historical exploration and preparation for reverts,
or even the use of a historical database revision as "production" while
development continues on a "development" head.
A database can be opened historically ``at`` or ``before`` a given transaction
serial or datetime. Here's a simple example. It should work with any storage
that supports ``loadBefore``. Unfortunately that does not include
MappingStorage, so we use a FileStorage instance. Also unfortunately, as of
this writing there is no reliable way to determine if a storage truly
implements loadBefore, or if it simply returns None (as in BaseStorage), other
than reading code.
We'll begin our example with a fairly standard set up. We
- make a storage and a database;
- open a normal connection;
- modify the database through the connection;
- commit a transaction, remembering the time in UTC;
- modify the database again; and
- commit a transaction.
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
... 'HistoricalConnectionTests.fs', create=True)
>>> import ZODB
>>> db = ZODB.DB(storage)
>>> conn = db.open()
>>> import persistent.mapping
>>> conn.root()['first'] = persistent.mapping.PersistentMapping(count=0)
>>> import transaction
>>> transaction.commit()
>>> import datetime
>>> now = datetime.datetime.utcnow()
>>> root = conn.root()
>>> root['second'] = persistent.mapping.PersistentMapping()
>>> root['first']['count'] += 1
>>> transaction.commit()
Now we will show a historical connection. We'll open one using the ``now``
value we generated above, and then demonstrate that the state of the original
connection, at the mutable head of the database, is different than the
historical state.
>>> transaction1 = transaction.TransactionManager()
>>> historical_conn = db.open(transaction_manager=transaction1, at=now)
>>> sorted(conn.root().keys())
['first', 'second']
>>> conn.root()['first']['count']
1
>>> historical_conn.root().keys()
['first']
>>> historical_conn.root()['first']['count']
0
Moreover, the historical connection cannot commit changes.
>>> historical_conn.root()['first']['count'] += 1
>>> historical_conn.root()['first']['count']
1
>>> transaction1.commit()
Traceback (most recent call last):
...
ReadOnlyHistoryError
>>> transaction1.abort()
>>> historical_conn.root()['first']['count']
0
(It is because of the mutable behavior outside of transactional semantics that
we must have a separate connection, and associated object cache, per thread,
even though the semantics should be readonly.)
As demonstrated, a timezone-naive datetime will be interpreted as UTC. You
can also pass a timezone-aware datetime or a serial (transaction id).
Here's opening with a serial--the serial of the root at the time of the first
commit.
>>> historical_serial = historical_conn.root()._p_serial
>>> historical_conn.close()
>>> historical_conn = db.open(transaction_manager=transaction1,
... at=historical_serial)
>>> historical_conn.root().keys()
['first']
>>> historical_conn.root()['first']['count']
0
>>> historical_conn.close()
We've shown the ``at`` argument. You can also ask to look ``before`` a datetime
or serial. (It's an error to pass both [#not_both]_) In this example, we're
looking at the database immediately prior to the most recent change to the
root.
>>> serial = conn.root()._p_serial
>>> historical_conn = db.open(
... transaction_manager=transaction1, before=serial)
>>> historical_conn.root().keys()
['first']
>>> historical_conn.root()['first']['count']
0
In fact, ``at`` arguments are translated into ``before`` values because the
underlying mechanism is a storage's loadBefore method. When you look at a
connection's ``before`` attribute, it is normalized into a ``before`` serial,
no matter what you pass into ``db.open``.
>>> print conn.before
None
>>> historical_conn.before == serial
True
>>> conn.close()
Configuration
=============
Like normal connections, the database lets you set how many historical
connections can be active without generating a warning for a given serial, and
how many objects should be kept in each connection's object cache.
>>> db.getHistoricalPoolSize()
3
>>> db.setHistoricalPoolSize(4)
>>> db.getHistoricalPoolSize()
4
>>> db.getHistoricalCacheSize()
1000
>>> db.setHistoricalCacheSize(2000)
>>> db.getHistoricalCacheSize()
2000
In addition, you can specify the minimum number of seconds that an unused
historical connection should be kept.
>>> db.getHistoricalTimeout()
300
>>> db.setHistoricalTimeout(400)
>>> db.getHistoricalTimeout()
400
All three of these values can be specified in a ZConfig file. We're using
mapping storage for simplicity, but remember, as we said at the start of this
document, mapping storage will not work for historical connections (and in fact
may seem to work but then fail confusingly) because it does not implement
loadBefore.
>>> import ZODB.config
>>> db2 = ZODB.config.databaseFromString('''
... <zodb>
... <mappingstorage/>
... historical-pool-size 5
... historical-cache-size 1500
... historical-timeout 6m
... </zodb>
... ''')
>>> db2.getHistoricalPoolSize()
5
>>> db2.getHistoricalCacheSize()
1500
>>> db2.getHistoricalTimeout()
360
Let's actually look at these values at work by shining some light into what
has been a black box up to now. We'll actually do some white box examination
of what is going on in the database, pools and connections.
First we'll clean out all the old historical pools so we have a clean slate.
>>> historical_conn.close()
>>> db.removeHistoricalPool(at=now)
>>> db.removeHistoricalPool(at=historical_serial)
>>> db.removeHistoricalPool(before=serial)
Now lets look what happens to the pools when we create an historical
connection.
>>> pools = db._pools
>>> len(pools)
1
>>> pools.keys()
['']
>>> historical_conn = db.open(
... transaction_manager=transaction1, before=serial)
>>> len(pools)
2
>>> set(pools.keys()) == set(('', serial))
True
>>> pool = pools[serial]
>>> len(pool.all)
1
>>> len(pool.available)
0
If you change the historical cache size, that changes the size of the
persistent cache on our connection.
>>> historical_conn._cache.cache_size
2000
>>> db.setHistoricalCacheSize(1500)
>>> historical_conn._cache.cache_size
1500
Now let's look at pool sizes. We'll set it to two, then make and close three
connections. We should end up with only two available connections.
>>> db.setHistoricalPoolSize(2)
>>> transaction2 = transaction.TransactionManager()
>>> historical_conn2 = db.open(
... transaction_manager=transaction2, before=serial)
>>> len(pools)
2
>>> len(pool.all)
2
>>> len(pool.available)
0
>>> transaction3 = transaction.TransactionManager()
>>> historical_conn3 = db.open(
... transaction_manager=transaction3, before=serial)
>>> len(pools)
2
>>> len(pool.all)
3
>>> len(pool.available)
0
>>> historical_conn3.close()
>>> len(pool.all)
3
>>> len(pool.available)
1
>>> historical_conn2.close()
>>> len(pool.all)
3
>>> len(pool.available)
2
>>> historical_conn.close()
>>> len(pool.all)
2
>>> len(pool.available)
2
Finally, we'll look at the timeout. We'll need to monkeypatch ``time`` for
this. (The funky __import__ of DB is because some ZODB __init__ shenanigans
make the DB class mask the DB module.)
>>> import time
>>> delta = 200
>>> def stub_time():
... return time.time() + delta
...
>>> DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
>>> original_time = DB_module.time
>>> DB_module.time = stub_time
>>> historical_conn = db.open(before=serial)
>>> len(pools)
2
>>> len(pool.all)
2
>>> len(pool.available)
1
A close or an open will do garbage collection on the timed out connections.
>>> delta += 200
>>> historical_conn.close()
>>> len(pools)
2
>>> len(pool.all)
1
>>> len(pool.available)
1
An open also does garbage collection on the pools themselves.
>>> delta += 400
>>> conn = db.open() # normal connection
>>> len(pools)
1
>>> len(pool.all)
0
>>> len(pool.available)
0
>>> serial in pools
False
Invalidations
=============
In general, invalidations are ignored for historical connections, assuming
that you have really specified a point in history. This is another white box
test.
>>> historical_conn = db.open(
... transaction_manager=transaction1, at=serial)
>>> sorted(conn.root().keys())
['first', 'second']
>>> conn.root()['first']['count']
1
>>> sorted(historical_conn.root().keys())
['first', 'second']
>>> historical_conn.root()['first']['count']
1
>>> conn.root()['first']['count'] += 1
>>> conn.root()['third'] = persistent.mapping.PersistentMapping()
>>> transaction.commit()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.close()
If you specify a time in the future, you get a read-only connection that
invalidates, rather than an error. The main reason for this is that, in some
cases, the most recent transaction id is in the future, so there's not an easy
way to reasonably disallow values. Beyond that, it's useful to have readonly
connections, though this spelling isn't quite appealing for the general case.
This "future history" also works with MVCC.
>>> THE_FUTURE = datetime.datetime(2038, 1, 19)
>>> historical_conn = db.open(
... transaction_manager=transaction1, at=THE_FUTURE)
>>> conn.root()['first']['count'] += 1
>>> conn.root()['fourth'] = persistent.mapping.PersistentMapping()
>>> transaction.commit()
>>> len(historical_conn._invalidated)
2
>>> historical_conn.root()['first']['count'] # MVCC
2
>>> historical_conn.sync()
>>> len(historical_conn._invalidated)
0
>>> historical_conn.root()['first']['count']
3
>>> historical_conn.root()['first']['count'] = 0
>>> transaction1.commit()
Traceback (most recent call last):
...
ReadOnlyHistoryError
>>> transaction1.abort()
>>> historical_conn.close()
Warnings
========
First, if you use datetimes to get a historical connection, be aware that the
conversion from datetime to transaction id has some pitfalls. Generally, the
transaction ids in the database are only as time-accurate as the system clock
was when the transaction id was created. Moreover, leap seconds are handled
somewhat naively in the ZODB (largely because they are handled naively in Unix/
POSIX time) so any minute that contains a leap second may contain serials that
are a bit off. This is not generally a problem for the ZODB, because serials
are guaranteed to increase, but it does highlight the fact that serials are not
guaranteed to be accurately connected to time. Generally, they are about as
reliable as time.time.
Second, historical connections currently introduce potentially wide variance in
memory requirements for the applications. Since you can open up many
connections to different serials, and each gets their own pool, you may collect
quite a few connections. For now, at least, if you use this feature you need to
be particularly careful of your memory usage. Get rid of pools when you know
you can, and reuse the exact same values for ``at`` or ``before`` when
possible. If historical connections are used for conflict resolution, these
connections will probably be temporary--not saved in a pool--so that the extra
memory usage would also be brief and unlikely to overlap.
.. ......... ..
.. Footnotes ..
.. ......... ..
.. [#not_both] It is an error to try and pass both `at` and `before`.
>>> historical_conn = db.open(
... transaction_manager=transaction1, at=now, before=historical_serial)
Traceback (most recent call last):
...
ValueError: can only pass zero or one of `at` and `before`
\ No newline at end of file
......@@ -34,9 +34,10 @@ class IConnection(Interface):
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
A Connection can be frozen to a serial--a transaction id, a single point in
history-- when it is created. By default, a Connection is not associated
with a serial; it uses current data. A Connection frozen to a serial is
read-only.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
......@@ -101,8 +102,7 @@ class IConnection(Interface):
User Methods:
root, get, add, close, db, sync, isReadOnly, cacheGC,
cacheFullSweep, cacheMinimize, getVersion,
modifiedInVersion
cacheFullSweep, cacheMinimize
Experimental Methods:
onCloseCallbacks
......@@ -226,9 +226,6 @@ class IConnection(Interface):
The root is a persistent.mapping.PersistentMapping.
"""
def getVersion():
"""Returns the version this connection is attached to."""
# Multi-database support.
connections = Attribute(
......@@ -325,7 +322,7 @@ class IStorageDB(Interface):
there would be so many that it would be inefficient to do so.
"""
def invalidate(transaction_id, oids, version=''):
def invalidate(transaction_id, oids):
"""Invalidate object ids committed by the given transaction
The oids argument is an iterable of object identifiers.
......@@ -356,13 +353,15 @@ class IDatabase(IStorageDB):
entry.
""")
def open(version='', transaction_manager=None):
def open(transaction_manager=None, serial=''):
"""Return an IConnection object for use by application code.
version: the "version" that all changes will be made
in, defaults to no version.
transaction_manager: transaction manager to use. None means
use the default transaction manager.
serial: the serial (transaction id) of the database to open.
An empty string (the default) means to open it to the newest
serial. Specifying a serial results in a read-only historical
connection.
Note that the connection pool is managed as a stack, to
increase the likelihood that the connection's stack will
......@@ -441,7 +440,7 @@ class IStorage(Interface):
This is used soley for informational purposes.
"""
def history(oid, version, size=1):
def history(oid, size=1):
"""Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned.
......@@ -457,10 +456,6 @@ class IStorage(Interface):
tid
The transaction identifier of the transaction that
committed the version.
version
The version that the revision is in. If the storage
doesn't support versions, then this must be an empty
string.
user_name
The user identifier, if any (or an empty string) of the
user on whos behalf the revision was committed.
......@@ -491,18 +486,14 @@ class IStorage(Interface):
This is used soley for informational purposes.
"""
def load(oid, version):
"""Load data for an object id and version
def load(oid):
"""Load data for an object id
A data record and serial are returned. The serial is a
transaction identifier of the transaction that wrote the data
record.
A POSKeyError is raised if there is no record for the object
id and version.
Storages that don't support versions must ignore the version
argument.
A POSKeyError is raised if there is no record for the object id.
"""
def loadBefore(oid, tid):
......@@ -575,7 +566,7 @@ class IStorage(Interface):
has a reasonable chance of being unique.
"""
def store(oid, serial, data, version, transaction):
def store(oid, serial, data, transaction):
"""Store data for the object id, oid.
Arguments:
......@@ -594,11 +585,6 @@ class IStorage(Interface):
data
The data record. This is opaque to the storage.
version
The version to store the data is. If the storage doesn't
support versions, this should be an empty string and the
storage is allowed to ignore it.
transaction
A transaction object. This should match the current
transaction for the storage, set by tpc_begin.
......@@ -707,7 +693,7 @@ class IStorageRestoreable(IStorage):
# failed to take into account records after the pack time.
def restore(oid, serial, data, version, prev_txn, transaction):
def restore(oid, serial, data, prev_txn, transaction):
"""Write data already committed in a separate database
The restore method is used when copying data from one database
......@@ -727,9 +713,6 @@ class IStorageRestoreable(IStorage):
The record data. This will be None if the transaction
undid the creation of the object.
version
The version identifier for the record
prev_txn
The identifier of a previous transaction that held the
object data. The target storage can sometimes use this
......@@ -746,7 +729,6 @@ class IStorageRecordInformation(Interface):
"""
oid = Attribute("The object id")
version = Attribute("The version")
data = Attribute("The data record")
class IStorageTransactionInformation(Interface):
......@@ -936,7 +918,7 @@ class IBlob(Interface):
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, oldserial, data, blob, version, transaction):
def storeBlob(oid, oldserial, data, blob, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial):
......
......@@ -371,7 +371,7 @@ class ObjectWriter:
return oid
# Note that we never get here for persistent classes.
# We'll use driect refs for normal classes.
# We'll use direct refs for normal classes.
if database_name:
return ['m', (database_name, oid, klass)]
......
......@@ -394,153 +394,6 @@ class VersionStorage:
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
def checkPackVersions(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = transaction.get()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = transaction.get()
txn.note("modify obj in version")
txn.commit()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = transaction.get()
txn.note("undo modification")
txn.commit()
snooze()
self._storage.pack(time.time(), referencesf)
db.commitVersion("testversion")
txn = transaction.get()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = transaction.get()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionsInPast(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = transaction.get()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = transaction.get()
txn.note("modify obj in version")
txn.commit()
t0 = time.time()
snooze()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = transaction.get()
txn.note("undo modification")
txn.commit()
self._storage.pack(t0, referencesf)
db.commitVersion("testversion")
txn = transaction.get()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = transaction.get()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionReachable(self):
db = DB(self._storage)
cn = db.open()
root = cn.root()
names = "a", "b", "c"
for name in names:
root[name] = MinPO(name)
transaction.commit()
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name]
obj.value = MinPO("version")
transaction.commit()
cn2.close()
root["d"] = MinPO("d")
transaction.commit()
snooze()
self._storage.pack(time.time(), referencesf)
cn.sync()
# make sure all the non-version data is there
for name, obj in root.items():
self.assertEqual(name, obj.value)
# make sure all the version-data is there,
# and create a new revision in the version
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name].value
self.assertEqual(obj.value, "version")
obj.value = "still version"
transaction.commit()
cn2.close()
db.abortVersion("b")
txn = transaction.get()
txn.note("abort version b")
txn.commit()
t = time.time()
snooze()
L = db.undoInfo()
db.undo(L[0]["id"])
txn = transaction.get()
txn.note("undo abort")
txn.commit()
self._storage.pack(t, referencesf)
cn2 = db.open(version="b")
rt2 = cn2.root()
self.assertEqual(rt2["b"].value.value, "still version")
def checkLoadBeforeVersion(self):
eq = self.assertEqual
oid = self._storage.new_oid()
......
......@@ -239,12 +239,12 @@ Closing connections adds them to the stack:
Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack):
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[0, 1, 2]
>>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all)
(3, 5)
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[1, 2, 3]
Similarly for the other two:
......@@ -252,7 +252,7 @@ Similarly for the other two:
>>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all)
(3, 3)
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[3, 4, 5]
Reducing the pool size may also purge the oldest closed connections:
......@@ -260,7 +260,7 @@ Reducing the pool size may also purge the oldest closed connections:
>>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all)
(2, 2)
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the
......
......@@ -14,7 +14,7 @@
import os
import time
import unittest
import warnings
import datetime
import transaction
......@@ -35,8 +35,6 @@ class DBTests(unittest.TestCase):
self.__path = os.path.abspath('test.fs')
store = ZODB.FileStorage.FileStorage(self.__path)
self.db = ZODB.DB(store)
warnings.filterwarnings(
'ignore', message='Versions are deprecated', module=__name__)
def tearDown(self):
self.db.close()
......@@ -44,8 +42,8 @@ class DBTests(unittest.TestCase):
if os.path.exists(self.__path+s):
os.remove(self.__path+s)
def dowork(self, version=''):
c = self.db.open(version)
def dowork(self):
c = self.db.open()
r = c.root()
o = r[time.time()] = MinPO(0)
transaction.commit()
......@@ -53,85 +51,95 @@ class DBTests(unittest.TestCase):
o.value = MinPO(i)
transaction.commit()
o = o.value
serial = o._p_serial
root_serial = r._p_serial
c.close()
return serial, root_serial
# make sure the basic methods are callable
def testSets(self):
self.db.setCacheSize(15)
self.db.setVersionCacheSize(15)
self.db.setHistoricalCacheSize(15)
def test_removeVersionPool(self):
# Test that we can remove a version pool
def test_removeHistoricalPool(self):
# Test that we can remove a historical pool
# This is white box because we check some internal data structures
self.dowork()
self.dowork('v2')
c1 = self.db.open('v1')
serial1, root_serial1 = self.dowork()
now = datetime.datetime.utcnow()
serial2, root_serial2 = self.dowork()
self.failUnless(root_serial1 < root_serial2)
c1 = self.db.open(at=now)
root = c1.root()
root.keys() # wake up object to get proper serial set
self.assertEqual(root._p_serial, root_serial1)
c1.close() # return to pool
c12 = self.db.open('v1')
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
pools = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
self.db.removeHistoricalPool(at=now)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
def _test_for_leak(self):
self.dowork()
self.dowork('v2')
now = datetime.datetime.utcnow()
self.dowork()
while 1:
c1 = self.db.open('v1')
self.db.removeVersionPool('v1')
c1 = self.db.open(at=now)
self.db.removeHistoricalPool(at=now)
c1.close() # return to pool
def test_removeVersionPool_while_connection_open(self):
def test_removeHistoricalPool_while_connection_open(self):
# Test that we can remove a version pool
# This is white box because we check some internal data structures
self.dowork()
self.dowork('v2')
c1 = self.db.open('v1')
now = datetime.datetime.utcnow()
self.dowork()
c1 = self.db.open(at=now)
c1.close() # return to pool
c12 = self.db.open('v1')
c12 = self.db.open(at=now)
self.assert_(c1 is c12) # should be same
pools = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
self.db.removeHistoricalPool(at=now)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12.close() # should leave pools alone
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12 = self.db.open('v1')
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
def test_references(self):
......
......@@ -136,20 +136,6 @@ class ZODBTests(unittest.TestCase):
def checkExportImportAborted(self):
self.checkExportImport(abort_it=True)
def checkVersionOnly(self):
# Make sure the changes to make empty transactions a no-op
# still allow things like abortVersion(). This should work
# because abortVersion() calls tpc_begin() itself.
conn = self._db.open("version")
try:
r = conn.root()
r[1] = 1
transaction.commit()
finally:
conn.close()
self._db.abortVersion("version")
transaction.commit()
def checkResetCache(self):
# The cache size after a reset should be 0. Note that
# _resetCache is not a public API, but the resetCaches()
......
##############################################################################
#
# Copyright (c) 2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id$
"""
import unittest
from zope.testing import doctest, module
def setUp(test):
module.setUp(test, 'historical_connections_txt')
def tearDown(test):
test.globs['db'].close()
test.globs['db2'].close()
test.globs['storage'].close()
test.globs['storage'].cleanup()
# the DB class masks the module because of __init__ shenanigans
DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
DB_module.time = test.globs['original_time']
module.tearDown(test)
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('../historical_connections.txt',
setUp=setUp,
tearDown=tearDown,
optionflags=doctest.INTERPRET_FOOTNOTES,
),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment