Commit 5f2b2d70 authored by Gary Poster's avatar Gary Poster

incrementally remove some version support (this time from connection and DB);...

incrementally remove some version support (this time from connection and DB); add support for historical read-only connections.
parent 418dd54c
......@@ -28,6 +28,14 @@ General
using `int` for memory sizes which caused errors on x86_64 Intel Xeon
machines (using 64-bit Linux).
- (unreleased, after 3.9.0a1) Removed version support from connections and
DB. Versions are still in the storages; this is an incremental step.
- (unreleased, after 3.9.0a1) Added support for read-only, historical
connections based on datetimes or serials (TIDs). See
src/ZODB/historical_connections.txt.
ZEO
---
......
......@@ -46,7 +46,7 @@ class MappingStorageConfig:
def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""
class FileStorageConnectionTests(
FileStorageConfig,
ConnectionTests.ConnectionTests,
......
......@@ -4,7 +4,7 @@ ZEO Fan Out
We should be able to set up ZEO servers with ZEO clients. Let's see
if we can make it work.
We'll use some helper functions. The first is a helpter that starts
We'll use some helper functions. The first is a helper that starts
ZEO servers for us and another one that picks ports.
We'll start the first server:
......@@ -16,7 +16,7 @@ We'll start the first server:
... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
Then we''ll start 2 others that use this one:
Then we'll start 2 others that use this one:
>>> port1 = ZEO.tests.testZEO.get_port()
>>> zconf1 = ZEO.tests.forker.ZEOConfig(('', port1))
......
This diff is collapsed.
This diff is collapsed.
......@@ -47,7 +47,7 @@ class ExportImport:
continue
done_oids[oid] = True
try:
p, serial = load(oid, self._version)
p, serial = load(oid, '')
except:
logger.debug("broken reference for oid %s", repr(oid),
exc_info=True)
......@@ -55,16 +55,16 @@ class ExportImport:
referencesf(p, oids)
f.writelines([oid, p64(len(p)), p])
if supports_blobs:
if not isinstance(self._reader.getGhost(p), Blob):
continue # not a blob
blobfilename = self._storage.loadBlob(oid, serial)
f.write(blob_begin_marker)
f.write(p64(os.stat(blobfilename).st_size))
blobdata = open(blobfilename, "rb")
cp(blobdata, f)
blobdata.close()
if supports_blobs:
if not isinstance(self._reader.getGhost(p), Blob):
continue # not a blob
blobfilename = self._storage.loadBlob(oid, serial)
f.write(blob_begin_marker)
f.write(p64(os.stat(blobfilename).st_size))
blobdata = open(blobfilename, "rb")
cp(blobdata, f)
blobdata.close()
f.write(export_end_marker)
return f
......@@ -127,8 +127,6 @@ class ExportImport:
return Ghost(oid)
version = self._version
while 1:
header = f.read(16)
if header == export_end_marker:
......@@ -180,9 +178,9 @@ class ExportImport:
if blob_filename is not None:
self._storage.storeBlob(oid, None, data, blob_filename,
version, transaction)
'', transaction)
else:
self._storage.store(oid, None, data, version, transaction)
self._storage.store(oid, None, data, '', transaction)
export_end_marker = '\377'*16
......
......@@ -234,6 +234,10 @@ class DanglingReferenceError(TransactionError):
return "from %s to %s" % (oid_repr(self.referer),
oid_repr(self.missing))
############################################################################
# Only used in storages; versions are no longer supported.
class VersionError(POSError):
"""An error in handling versions occurred."""
......@@ -246,6 +250,7 @@ class VersionLockError(VersionError, TransactionError):
An attempt was made to modify an object that has been modified in an
unsaved version.
"""
############################################################################
class UndoError(POSError):
"""An attempt was made to undo a non-undoable transaction."""
......@@ -292,6 +297,9 @@ class ExportError(POSError):
class Unsupported(POSError):
"""A feature was used that is not supported by the storage."""
class ReadOnlyHistoryError(POSError):
"""Unable to add or modify objects in an historical connection."""
class InvalidObjectReference(POSError):
"""An object contains an invalid reference to another object.
......
......@@ -180,16 +180,22 @@
and exceeding twice pool-size connections causes a critical
message to be logged.
</description>
<key name="version-pool-size" datatype="integer" default="3"/>
<key name="historical-pool-size" datatype="integer" default="3"/>
<description>
The expected maximum number of connections simultaneously open
per version.
per historical revision.
</description>
<key name="version-cache-size" datatype="integer" default="100"/>
<key name="historical-cache-size" datatype="integer" default="1000"/>
<description>
Target size, in number of objects, of each version connection's
Target size, in number of objects, of each historical connection's
object cache.
</description>
<key name="historical-timeout" datatype="time-interval"
default="5m"/>
<description>
The minimum interval that an unused historical connection should be
kept.
</description>
<key name="database-name" default="unnamed"/>
<description>
When multidatabases are in use, this is the name given to this
......
......@@ -68,7 +68,6 @@ def storageFromURL(url):
def storageFromConfig(section):
return section.open()
class BaseConfig:
"""Object representing a configured storage or database.
......@@ -99,8 +98,9 @@ class ZODBDatabase(BaseConfig):
return ZODB.DB(storage,
pool_size=section.pool_size,
cache_size=section.cache_size,
version_pool_size=section.version_pool_size,
version_cache_size=section.version_cache_size,
historical_pool_size=section.historical_pool_size,
historical_cache_size=section.historical_cache_size,
historical_timeout=section.historical_timeout,
database_name=section.database_name,
databases=databases)
except:
......
This diff is collapsed.
......@@ -34,9 +34,10 @@ class IConnection(Interface):
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
A Connection can be frozen to a serial--a transaction id, a single point in
history-- when it is created. By default, a Connection is not associated
with a serial; it uses current data. A Connection frozen to a serial is
read-only.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
......@@ -101,8 +102,7 @@ class IConnection(Interface):
User Methods:
root, get, add, close, db, sync, isReadOnly, cacheGC,
cacheFullSweep, cacheMinimize, getVersion,
modifiedInVersion
cacheFullSweep, cacheMinimize
Experimental Methods:
onCloseCallbacks
......@@ -226,9 +226,6 @@ class IConnection(Interface):
The root is a persistent.mapping.PersistentMapping.
"""
def getVersion():
"""Returns the version this connection is attached to."""
# Multi-database support.
connections = Attribute(
......@@ -325,7 +322,7 @@ class IStorageDB(Interface):
there would be so many that it would be inefficient to do so.
"""
def invalidate(transaction_id, oids, version=''):
def invalidate(transaction_id, oids):
"""Invalidate object ids committed by the given transaction
The oids argument is an iterable of object identifiers.
......@@ -356,13 +353,15 @@ class IDatabase(IStorageDB):
entry.
""")
def open(version='', transaction_manager=None):
def open(transaction_manager=None, serial=''):
"""Return an IConnection object for use by application code.
version: the "version" that all changes will be made
in, defaults to no version.
transaction_manager: transaction manager to use. None means
use the default transaction manager.
serial: the serial (transaction id) of the database to open.
An empty string (the default) means to open it to the newest
serial. Specifying a serial results in a read-only historical
connection.
Note that the connection pool is managed as a stack, to
increase the likelihood that the connection's stack will
......@@ -441,7 +440,7 @@ class IStorage(Interface):
This is used soley for informational purposes.
"""
def history(oid, version, size=1):
def history(oid, size=1):
"""Return a sequence of history information dictionaries.
Up to size objects (including no objects) may be returned.
......@@ -457,10 +456,6 @@ class IStorage(Interface):
tid
The transaction identifier of the transaction that
committed the version.
version
The version that the revision is in. If the storage
doesn't support versions, then this must be an empty
string.
user_name
The user identifier, if any (or an empty string) of the
user on whos behalf the revision was committed.
......@@ -491,18 +486,14 @@ class IStorage(Interface):
This is used soley for informational purposes.
"""
def load(oid, version):
"""Load data for an object id and version
def load(oid):
"""Load data for an object id
A data record and serial are returned. The serial is a
transaction identifier of the transaction that wrote the data
record.
A POSKeyError is raised if there is no record for the object
id and version.
Storages that don't support versions must ignore the version
argument.
A POSKeyError is raised if there is no record for the object id.
"""
def loadBefore(oid, tid):
......@@ -575,7 +566,7 @@ class IStorage(Interface):
has a reasonable chance of being unique.
"""
def store(oid, serial, data, version, transaction):
def store(oid, serial, data, transaction):
"""Store data for the object id, oid.
Arguments:
......@@ -594,11 +585,6 @@ class IStorage(Interface):
data
The data record. This is opaque to the storage.
version
The version to store the data is. If the storage doesn't
support versions, this should be an empty string and the
storage is allowed to ignore it.
transaction
A transaction object. This should match the current
transaction for the storage, set by tpc_begin.
......@@ -707,7 +693,7 @@ class IStorageRestoreable(IStorage):
# failed to take into account records after the pack time.
def restore(oid, serial, data, version, prev_txn, transaction):
def restore(oid, serial, data, prev_txn, transaction):
"""Write data already committed in a separate database
The restore method is used when copying data from one database
......@@ -727,9 +713,6 @@ class IStorageRestoreable(IStorage):
The record data. This will be None if the transaction
undid the creation of the object.
version
The version identifier for the record
prev_txn
The identifier of a previous transaction that held the
object data. The target storage can sometimes use this
......@@ -746,7 +729,6 @@ class IStorageRecordInformation(Interface):
"""
oid = Attribute("The object id")
version = Attribute("The version")
data = Attribute("The data record")
class IStorageTransactionInformation(Interface):
......@@ -936,7 +918,7 @@ class IBlob(Interface):
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, oldserial, data, blob, version, transaction):
def storeBlob(oid, oldserial, data, blob, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial):
......
......@@ -371,7 +371,7 @@ class ObjectWriter:
return oid
# Note that we never get here for persistent classes.
# We'll use driect refs for normal classes.
# We'll use direct refs for normal classes.
if database_name:
return ['m', (database_name, oid, klass)]
......
......@@ -394,153 +394,6 @@ class VersionStorage:
self._storage.tpc_finish(t)
self.assertEqual(oids, [oid])
def checkPackVersions(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = transaction.get()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = transaction.get()
txn.note("modify obj in version")
txn.commit()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = transaction.get()
txn.note("undo modification")
txn.commit()
snooze()
self._storage.pack(time.time(), referencesf)
db.commitVersion("testversion")
txn = transaction.get()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = transaction.get()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionsInPast(self):
db = DB(self._storage)
cn = db.open(version="testversion")
root = cn.root()
obj = root["obj"] = MinPO("obj")
root["obj2"] = MinPO("obj2")
txn = transaction.get()
txn.note("create 2 objs in version")
txn.commit()
obj.value = "77"
txn = transaction.get()
txn.note("modify obj in version")
txn.commit()
t0 = time.time()
snooze()
# undo the modification to generate a mix of backpointers
# and versions for pack to chase
info = db.undoInfo()
db.undo(info[0]["id"])
txn = transaction.get()
txn.note("undo modification")
txn.commit()
self._storage.pack(t0, referencesf)
db.commitVersion("testversion")
txn = transaction.get()
txn.note("commit version")
txn.commit()
cn = db.open()
root = cn.root()
root["obj"] = "no version"
txn = transaction.get()
txn.note("modify obj")
txn.commit()
self._storage.pack(time.time(), referencesf)
def checkPackVersionReachable(self):
db = DB(self._storage)
cn = db.open()
root = cn.root()
names = "a", "b", "c"
for name in names:
root[name] = MinPO(name)
transaction.commit()
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name]
obj.value = MinPO("version")
transaction.commit()
cn2.close()
root["d"] = MinPO("d")
transaction.commit()
snooze()
self._storage.pack(time.time(), referencesf)
cn.sync()
# make sure all the non-version data is there
for name, obj in root.items():
self.assertEqual(name, obj.value)
# make sure all the version-data is there,
# and create a new revision in the version
for name in names:
cn2 = db.open(version=name)
rt2 = cn2.root()
obj = rt2[name].value
self.assertEqual(obj.value, "version")
obj.value = "still version"
transaction.commit()
cn2.close()
db.abortVersion("b")
txn = transaction.get()
txn.note("abort version b")
txn.commit()
t = time.time()
snooze()
L = db.undoInfo()
db.undo(L[0]["id"])
txn = transaction.get()
txn.note("undo abort")
txn.commit()
self._storage.pack(t, referencesf)
cn2 = db.open(version="b")
rt2 = cn2.root()
self.assertEqual(rt2["b"].value.value, "still version")
def checkLoadBeforeVersion(self):
eq = self.assertEqual
oid = self._storage.new_oid()
......
......@@ -239,12 +239,12 @@ Closing connections adds them to the stack:
Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack):
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[0, 1, 2]
>>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all)
(3, 5)
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[1, 2, 3]
Similarly for the other two:
......@@ -252,7 +252,7 @@ Similarly for the other two:
>>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all)
(3, 3)
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[3, 4, 5]
Reducing the pool size may also purge the oldest closed connections:
......@@ -260,7 +260,7 @@ Reducing the pool size may also purge the oldest closed connections:
>>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all)
(2, 2)
>>> [c.MARKER for c in pool.available]
>>> [c.MARKER for c in pool.available.values()]
[4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the
......
......@@ -14,7 +14,7 @@
import os
import time
import unittest
import warnings
import datetime
import transaction
......@@ -35,8 +35,6 @@ class DBTests(unittest.TestCase):
self.__path = os.path.abspath('test.fs')
store = ZODB.FileStorage.FileStorage(self.__path)
self.db = ZODB.DB(store)
warnings.filterwarnings(
'ignore', message='Versions are deprecated', module=__name__)
def tearDown(self):
self.db.close()
......@@ -44,8 +42,8 @@ class DBTests(unittest.TestCase):
if os.path.exists(self.__path+s):
os.remove(self.__path+s)
def dowork(self, version=''):
c = self.db.open(version)
def dowork(self):
c = self.db.open()
r = c.root()
o = r[time.time()] = MinPO(0)
transaction.commit()
......@@ -53,85 +51,95 @@ class DBTests(unittest.TestCase):
o.value = MinPO(i)
transaction.commit()
o = o.value
serial = o._p_serial
root_serial = r._p_serial
c.close()
return serial, root_serial
# make sure the basic methods are callable
def testSets(self):
self.db.setCacheSize(15)
self.db.setVersionCacheSize(15)
self.db.setHistoricalCacheSize(15)
def test_removeVersionPool(self):
# Test that we can remove a version pool
def test_removeHistoricalPool(self):
# Test that we can remove a historical pool
# This is white box because we check some internal data structures
self.dowork()
self.dowork('v2')
c1 = self.db.open('v1')
serial1, root_serial1 = self.dowork()
now = datetime.datetime.utcnow()
serial2, root_serial2 = self.dowork()
self.failUnless(root_serial1 < root_serial2)
c1 = self.db.open(at=now)
root = c1.root()
root.keys() # wake up object to get proper serial set
self.assertEqual(root._p_serial, root_serial1)
c1.close() # return to pool
c12 = self.db.open('v1')
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is c12) # should be same
pools = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1')
self.db.removeHistoricalPool(at=now)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
def _test_for_leak(self):
self.dowork()
self.dowork('v2')
now = datetime.datetime.utcnow()
self.dowork()
while 1:
c1 = self.db.open('v1')
self.db.removeVersionPool('v1')
c1 = self.db.open(at=now)
self.db.removeHistoricalPool(at=now)
c1.close() # return to pool
def test_removeVersionPool_while_connection_open(self):
def test_removeHistoricalPool_while_connection_open(self):
# Test that we can remove a version pool
# This is white box because we check some internal data structures
self.dowork()
self.dowork('v2')
c1 = self.db.open('v1')
now = datetime.datetime.utcnow()
self.dowork()
c1 = self.db.open(at=now)
c1.close() # return to pool
c12 = self.db.open('v1')
c12 = self.db.open(at=now)
self.assert_(c1 is c12) # should be same
pools = self.db._pools
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
self.db.removeHistoricalPool(at=now)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12.close() # should leave pools alone
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
self.assertEqual(len(pools), 1)
self.assertEqual(nconn(pools), 1)
c12 = self.db.open('v1')
c12 = self.db.open(at=now)
c12.close() # return to pool
self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3)
self.assertEqual(nconn(pools), 3)
self.assertEqual(len(pools), 2)
self.assertEqual(nconn(pools), 2)
def test_references(self):
......
......@@ -136,20 +136,6 @@ class ZODBTests(unittest.TestCase):
def checkExportImportAborted(self):
self.checkExportImport(abort_it=True)
def checkVersionOnly(self):
# Make sure the changes to make empty transactions a no-op
# still allow things like abortVersion(). This should work
# because abortVersion() calls tpc_begin() itself.
conn = self._db.open("version")
try:
r = conn.root()
r[1] = 1
transaction.commit()
finally:
conn.close()
self._db.abortVersion("version")
transaction.commit()
def checkResetCache(self):
# The cache size after a reset should be 0. Note that
# _resetCache is not a public API, but the resetCaches()
......
##############################################################################
#
# Copyright (c) 2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id$
"""
import unittest
from zope.testing import doctest, module
def setUp(test):
module.setUp(test, 'historical_connections_txt')
def tearDown(test):
test.globs['db'].close()
test.globs['db2'].close()
test.globs['storage'].close()
test.globs['storage'].cleanup()
# the DB class masks the module because of __init__ shenanigans
DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
DB_module.time = test.globs['original_time']
module.tearDown(test)
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('../historical_connections.txt',
setUp=setUp,
tearDown=tearDown,
optionflags=doctest.INTERPRET_FOOTNOTES,
),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment