Commit 66cd2dca authored by Jim Fulton's avatar Jim Fulton

Reimplemented DemoStorage to be just a wrapper around a base storage

and a changes storage.
parent 3c6c5ef2
############################################################################## ##############################################################################
# #
# Copyright (c) 2001, 2002 Zope Corporation and Contributors. # Copyright (c) Zope Corporation and Contributors.
# All Rights Reserved. # All Rights Reserved.
# #
# This software is subject to the provisions of the Zope Public License, # This software is subject to the provisions of the Zope Public License,
...@@ -13,592 +13,153 @@ ...@@ -13,592 +13,153 @@
############################################################################## ##############################################################################
"""Demo ZODB storage """Demo ZODB storage
The Demo storage serves two purposes: A demo storage supports demos by allowing a volatile changed database
to be layered over a base database.
- Provide an example implementation of a full storage without The base storage must not change.
distracting storage details,
- Provide a volatile storage that is useful for giving demonstrations.
The demo storage can have a "base" storage that is used in a
read-only fashion. The base storage must not contain version
data.
There are three main data structures:
_data -- Transaction logging information necessary for undo
This is a mapping from transaction id to transaction, where
a transaction is simply a 5-tuple:
packed, user, description, extension_data, records
where extension_data is a dictionary or None and records are the
actual records in chronological order. Packed is a flag
indicating whethe the transaction has been packed or not
_index -- A mapping from oid to record
_vindex -- A mapping from version name to version data
where version data is a mapping from oid to record
A record is a tuple:
oid, pre, vdata, p, tid
where:
oid -- object id
pre -- The previous record for this object (or None)
vdata -- version data
None if not a version, ortherwise:
version, non-version-record
p -- the pickle data or None
tid -- the transaction id that wrote the record
The pickle data will be None for a record for an object created in
an aborted version.
It is instructive to watch what happens to the internal data structures
as changes are made. For example, in Zope, you can create an external
method::
import Zope2
def info(RESPONSE):
RESPONSE['Content-type']= 'text/plain'
return Zope2.DB._storage._splat()
and call it to monitor the storage.
""" """
import random
import cPickle import threading
import base64, time import ZODB.MappingStorage
import ZODB.POSException
import ZODB.BaseStorage import ZODB.utils
import ZODB.interfaces
import zope.interface class DemoStorage:
from ZODB import POSException
from ZODB.utils import z64, oid_repr def __init__(self, name=None, base=None, changes=None):
from persistent.TimeStamp import TimeStamp if base is None:
from BTrees import OOBTree base = ZODB.MappingStorage.MappingStorage()
self.base = base
class DemoStorage(ZODB.BaseStorage.BaseStorage): if changes is None:
"""Demo storage changes = ZODB.MappingStorage.MappingStorage()
self.changes = changes
Demo storages provide useful storages for writing tests because
they store their data in memory and throw away their data if name is None:
(implicitly) when they are closed. name = 'DemoStorage(%r, %r)' % (base.getName(), changes.getName())
self.__name__ = name
They were originally designed to allow demonstrations using base
data provided on a CD. They can optionally wrap an *unchanging* supportsUndo = getattr(changes, 'supportsUndo', None)
base storage. It is critical that the base storage does not if supportsUndo is not None and supportsUndo():
change. Using a changing base storage is not just unsupported, it for meth in ('supportsUndo', 'undo', 'undoLog', 'undoInfo'):
is known not to work and can even lead to serious errors and even setattr(self, meth, getattr(changes, meth))
core dumps.
for meth in (
'_lock_acquire', '_lock_release',
'getSize', 'history', 'isReadOnly', 'registerDB',
'sortKey', 'tpc_begin', 'tpc_abort', 'tpc_finish',
'tpc_transaction', 'tpc_vote',
):
setattr(self, meth, getattr(changes, meth))
lastInvalidations = getattr(changes, 'lastInvalidations', None)
if lastInvalidations is not None:
self.lastInvalidations = lastInvalidations
""" def cleanup(self):
self.base.cleanup()
zope.interface.implements(ZODB.interfaces.IStorageIteration) self.changes.cleanup()
def __init__(self, name='Demo Storage', base=None, quota=None):
ZODB.BaseStorage.BaseStorage.__init__(self, name, base)
# We use a BTree because the items are sorted!
self._data = OOBTree.OOBTree()
self._index = {}
self._vindex = {}
self._base = base
self._size = 0
self._quota = quota
self._ltid = None
self._clear_temp()
try:
versions = base.versions
except AttributeError:
pass
else:
if base.versions():
raise POSException.StorageError(
"Demo base storage has version data")
# When DemoStorage needs to create a new oid, and there is a base
# storage, it must use that storage's new_oid() method. Else
# DemoStorage may end up assigning "new" oids that are already in use
# by the base storage, leading to a variety of "impossible" problems.
def new_oid(self):
if self._base is None:
return ZODB.BaseStorage.BaseStorage.new_oid(self)
else:
return self._base.new_oid()
def __len__(self):
base=self._base
return (base and len(base) or 0) + len(self._index)
def getSize(self):
s=100
for tid, (p, u, d, e, t) in self._data.items():
s=s+16+24+12+4+16+len(u)+16+len(d)+16+len(e)+16
for oid, pre, vdata, p, tid in t:
s=s+16+24+24+4+4+(p and (16+len(p)) or 4)
if vdata: s=s+12+16+len(vdata[0])+4
s=s+16*len(self._index)
for v in self._vindex.values():
s=s+32+16*len(v)
self._size=s def close(self):
return s self.base.close()
self.changes.close()
def abortVersion(self, src, transaction): def getName(self):
if transaction is not self._transaction: return self.__name__
raise POSException.StorageTransactionError(self, transaction) __repr__ = getName
if not src:
raise POSException.VersionCommitError("Invalid version")
self._lock_acquire() def getTid(self, oid):
try: try:
v = self._vindex.get(src, None) return self.changes.getTid(oid)
if not v: except ZODB.POSException.POSKeyError:
return return self.base.getTid(oid)
oids = []
for r in v.values():
oid, pre, (version, nv), p, tid = r
oids.append(oid)
if nv:
oid, pre, vdata, p, tid = nv
self._tindex.append([oid, r, None, p, self._tid])
else:
# effectively, delete the thing
self._tindex.append([oid, r, None, None, self._tid])
return self._tid, oids def iterator(self, start=None, end=None):
for t in self.base.iterator(start, end):
finally: self._lock_release() yield t
for t in self.changes.iterator(start, end):
yield t
def commitVersion(self, src, dest, transaction): def lastTransaction(self):
if transaction is not self._transaction: t = self.changes.lastTransaction()
raise POSException.StorageTransactionError(self, transaction) if t == ZODB.utils.z64:
t = self.base.lastTransaction()
return t
if not src: def __len__(self):
raise POSException.VersionCommitError("Invalid source version") return len(self.changes)
if src == dest:
raise POSException.VersionCommitError(
"Can't commit to same version: %s" % repr(src))
self._lock_acquire() def load(self, oid, version=''):
try: try:
v = self._vindex.get(src) return self.changes.load(oid, version)
if v is None: except ZODB.POSException.POSKeyError:
return return self.base.load(oid, version)
newserial = self._tid def loadBefore(self, oid, tid):
tindex = self._tindex
oids = []
for r in v.values():
oid, pre, vdata, p, tid = r
assert vdata is not None
oids.append(oid)
if dest:
new_vdata = dest, vdata[1]
else:
new_vdata = None
tindex.append([oid, r, new_vdata, p, self._tid])
return self._tid, oids
finally:
self._lock_release()
def load(self, oid, version):
self._lock_acquire()
try: try:
result = self.changes.loadBefore(oid, tid)
except ZODB.POSException.POSKeyError:
# The oid isn't in the changes, so defer to base
return self.base.loadBefore(oid, tid)
if result is None:
# The oid *was* in the changes, but there aren't any
# earlier records. Maybe there are in the base.
try: try:
oid, pre, vdata, p, tid = self._index[oid] return self.base.loadBefore(oid, tid)
except KeyError: except ZODB.POSException.POSKeyError:
if self._base: # The oid isn't in the base, so None will be the right result
return self._base.load(oid, version) pass
raise KeyError(oid)
return result
if vdata:
oversion, nv = vdata def loadSerial(self, oid, serial):
if oversion != version:
if nv:
# Return the current txn's tid with the non-version
# data.
p = nv[3]
else:
raise KeyError(oid)
if p is None:
raise KeyError(oid)
return p, tid
finally: self._lock_release()
def modifiedInVersion(self, oid):
self._lock_acquire()
try: try:
try: return self.changes.loadSerial(oid, serial)
oid, pre, vdata, p, tid = self._index[oid] except ZODB.POSException.POSKeyError:
if vdata: return vdata[0] return self.base.loadSerial(oid, serial)
return ''
except: return ''
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction): def new_oid(self):
if transaction is not self._transaction: while 1:
raise POSException.StorageTransactionError(self, transaction) oid = ZODB.utils.p64(random.randint(1, 9223372036854775807))
try:
self._lock_acquire() self.changes.load(oid, '')
try: except ZODB.POSException.POSKeyError:
old = self._index.get(oid, None) pass
if old is None: else:
# Hm, nothing here, check the base version:
if self._base:
try:
p, tid = self._base.load(oid, '')
except KeyError:
pass
else:
old = oid, None, None, p, tid
nv=None
if old:
oid, pre, vdata, p, tid = old
if vdata:
if vdata[0] != version:
raise POSException.VersionLockError(oid)
nv=vdata[1]
else:
nv=old
if serial != tid:
raise POSException.ConflictError(
oid=oid, serials=(tid, serial), data=data)
r = [oid, old, version and (version, nv) or None, data, self._tid]
self._tindex.append(r)
s=self._tsize
s=s+72+(data and (16+len(data)) or 4)
if version: s=s+32+len(version)
if self._quota is not None and s > self._quota:
raise POSException.StorageError(
'''<b>Quota Exceeded</b><br>
The maximum quota for this demonstration storage
has been exceeded.<br>Have a nice day.''')
finally: self._lock_release()
return self._tid
def supportsVersions(self):
return 1
def _clear_temp(self):
self._tindex = []
self._tsize = self._size + 160
def lastTransaction(self):
return self._ltid
def _begin(self, tid, u, d, e):
self._tsize = self._size + 120 + len(u) + len(d) + len(e)
def _finish(self, tid, user, desc, ext):
if not self._tindex:
# No data, so we don't update anything.
return
self._size = self._tsize
self._data[tid] = None, user, desc, ext, tuple(self._tindex)
for r in self._tindex:
oid, pre, vdata, p, tid = r
old = self._index.get(oid)
# If the object had version data, remove the version data.
if old is not None:
oldvdata = old[2]
if oldvdata:
v = self._vindex[oldvdata[0]]
del v[oid]
if not v:
# If the version info is now empty, remove it.
del self._vindex[oldvdata[0]]
self._index[oid] = r
# If there is version data, then udpate self._vindex, too.
if vdata:
version = vdata[0]
v = self._vindex.get(version)
if v is None:
v = self._vindex[version] = {}
v[oid] = r
self._ltid = self._tid
def undoLog(self, first, last, filter=None):
if last < 0: # abs(last) is an upper bound on the # to return
last = first - last
self._lock_acquire()
try:
transactions = self._data.items()
pos = len(transactions)
r = []
i = 0
while i < last and pos:
pos -= 1
tid, (p, u, d, e, t) = transactions[pos]
# Bug alert: why do we skip this if the transaction
# has been packed?
if p:
continue
d = {'id': base64.encodestring(tid)[:-1],
'time': TimeStamp(tid).timeTime(),
'user_name': u, 'description': d}
if e:
d.update(cPickle.loads(e))
if filter is None or filter(d):
if i >= first:
r.append(d)
i += 1
return r
finally:
self._lock_release()
def versionEmpty(self, version):
return not self._vindex.get(version, None)
def versions(self, max=None):
r = []
for v in self._vindex.keys():
if self.versionEmpty(v):
continue continue
r.append(v) try:
if max is not None and len(r) >= max: self.base.load(oid, '')
break except ZODB.POSException.POSKeyError:
return r pass
else:
def _build_indexes(self, stop='\377\377\377\377\377\377\377\377'): continue
# Rebuild index structures from transaction data
index = {} return oid
vindex = {}
for tid, (p, u, d, e, t) in self._data.items():
if tid >= stop:
break
for r in t:
oid, pre, vdata, p, tid = r
old=index.get(oid, None)
if old is not None:
oldvdata=old[2]
if oldvdata:
v=vindex[oldvdata[0]]
del v[oid]
if not v: del vindex[oldvdata[0]]
index[oid]=r
if vdata:
version=vdata[0]
v=vindex.get(version, None)
if v is None: v=vindex[version]={}
vindex[vdata[0]][oid]=r
return index, vindex
def pack(self, t, referencesf):
# Packing is hard, at least when undo is supported.
# Even for a simple storage like this one, packing
# is pretty complex.
self._lock_acquire() def pack(self, t, referencesf, gc=False):
try: try:
self.changes.pack(t, referencesf, gc=False)
except TypeError, v:
if 'gc' in str(v):
pass # The gc arg isn't supported. Don't pack
raise
stop=`TimeStamp(*time.gmtime(t)[:5]+(t%60,))` def store(self, oid, serial, data, version, transaction):
assert version=='', "versions aren't supported"
# Build indexes up to the pack time:
index, vindex = self._build_indexes(stop)
# TODO: This packing algorithm is flawed. It ignores
# references from non-current records after the pack
# time.
# Now build an index of *only* those objects reachable
# from the root.
rootl = [z64]
pindex = {}
while rootl:
oid = rootl.pop()
if oid in pindex:
continue
# Scan non-version pickle for references
r = index.get(oid, None)
if r is None:
if self._base:
p, s = self._base.load(oid, '')
referencesf(p, rootl)
else:
pindex[oid] = r
oid, pre, vdata, p, tid = r
referencesf(p, rootl)
if vdata:
nv = vdata[1]
if nv:
oid, pre, vdata, p, tid = nv
referencesf(p, rootl)
# Now we're ready to do the actual packing.
# We'll simply edit the transaction data in place.
# We'll defer deleting transactions till the end
# to avoid messing up the BTree items.
deleted = []
for tid, (p, u, d, e, records) in self._data.items():
if tid >= stop:
break
o = []
for r in records:
c = pindex.get(r[0])
if c is None:
# GC this record, no longer referenced
continue
if c == r:
# This is the most recent revision.
o.append(r)
else:
# This record is not the indexed record,
# so it may not be current. Let's see.
vdata = r[3]
if vdata:
# Version record are current *only* if they
# are indexed
continue
else:
# OK, this isn't a version record, so it may be the
# non-version record for the indexed record.
vdata = c[3]
if vdata:
if vdata[1] != r:
# This record is not the non-version
# record for the indexed record
continue
else:
# The indexed record is not a version record,
# so this record can not be the non-version
# record for it.
continue
o.append(r)
if o:
if len(o) != len(records):
self._data[tid] = 1, u, d, e, tuple(o) # Reset data
else:
deleted.append(tid)
# Now delete empty transactions
for tid in deleted:
del self._data[tid]
# Now reset previous pointers for "current" records:
for r in pindex.values():
r[1] = None # Previous record
if r[2] and r[2][1]: # vdata
# If this record contains version data and
# non-version data, then clear it out.
r[2][1][2] = None
# Finally, rebuild indexes from transaction data:
self._index, self._vindex = self._build_indexes()
finally:
self._lock_release()
self.getSize()
def _splat(self):
"""Spit out a string showing state.
"""
o=[]
o.append('Transactions:')
for tid, (p, u, d, e, t) in self._data.items():
o.append(" %s %s" % (TimeStamp(tid), p))
for r in t:
oid, pre, vdata, p, tid = r
oid = oid_repr(oid)
tid = oid_repr(tid)
## if serial is not None: serial=str(TimeStamp(serial))
pre=id(pre)
if vdata and vdata[1]: vdata=vdata[0], id(vdata[1])
if p: p=''
o.append(' %s: %s' %
(id(r), `(oid, pre, vdata, p, tid)`))
o.append('\nIndex:')
items=self._index.items()
items.sort()
for oid, r in items:
if r: r=id(r)
o.append(' %s: %s' % (oid_repr(oid), r))
o.append('\nVersion Index:')
items=self._vindex.items()
items.sort()
for version, v in items:
o.append(' '+version)
vitems=v.items()
vitems.sort()
for oid, r in vitems:
if r: r=id(r)
o.append(' %s: %s' % (oid_repr(oid), r))
return '\n'.join(o)
def cleanup(self):
if self._base is not None:
self._base.cleanup()
def close(self):
if self._base is not None:
self._base.close()
def iterator(self, start=None, end=None):
# First iterate over the base storage
if self._base is not None:
for transaction in self._base.iterator(start, end):
yield transaction
# Then iterate over our local transactions
for tid, transaction in self._data.items():
if tid >= start and tid <= end:
yield TransactionRecord(tid, transaction)
class TransactionRecord(ZODB.BaseStorage.TransactionRecord):
def __init__(self, tid, transaction):
packed, user, description, extension, records = transaction
super(TransactionRecord, self).__init__(
tid, packed, user, description, extension)
self.records = transaction
def __iter__(self): # See if we already have changes for this oid
for record in self.records: try:
oid, prev, version, data, tid = record old = self.changes.load(oid, '')[1]
yield ZODB.BaseStorage.DataRecord(oid, tid, data, version, prev) except ZODB.POSException.POSKeyError:
try:
old = self.base.load(oid, '')[1]
except ZODB.POSException.POSKeyError:
old = serial
if old != serial:
raise ZODB.POSException.ConflictError(
oid=oid, serials=(old, serial)) # XXX untested branch
return self.changes.store(oid, serial, data, '', transaction)
DemoStorage demo (doctest)
--------------------------
Note that most people will configure the storage through ZConfig. If
you are one of those people, you may want to stop here. :) The
examples below show you how to use the storage from Python, but they
also exercise lots of details you might not be interested in.
To see how this works, we'll start by creating a base storage and
puting an object (in addition to the root object) in it:
>>> from ZODB.FileStorage import FileStorage
>>> base = FileStorage('base.fs')
>>> from ZODB.DB import DB
>>> db = DB(base)
>>> from persistent.mapping import PersistentMapping
>>> conn = db.open()
>>> conn.root()['1'] = PersistentMapping({'a': 1, 'b':2})
>>> import transaction
>>> transaction.commit()
>>> db.close()
>>> import os
>>> original_size = os.path.getsize('base.fs')
Now, lets reopen the base storage in read-only mode:
>>> base = FileStorage('base.fs', read_only=True)
And open a new storage to store changes:
>>> changes = FileStorage('changes.fs')
and combine the 2 in a demofilestorage:
>>> from ZODB.DemoStorage import DemoStorage
>>> storage = DemoStorage(base=base, changes=changes)
If there are no transactions, the storage reports the lastTransaction
of the base database:
>>> storage.lastTransaction() == base.lastTransaction()
True
Let's add some data:
>>> db = DB(storage)
>>> conn = db.open()
>>> items = conn.root()['1'].items()
>>> items.sort()
>>> items
[('a', 1), ('b', 2)]
>>> conn.root()['2'] = PersistentMapping({'a': 3, 'b':4})
>>> transaction.commit()
>>> conn.root()['2']['c'] = 5
>>> transaction.commit()
Here we can see that we haven't modified the base storage:
>>> original_size == os.path.getsize('base.fs')
True
But we have modified the changes database:
>>> len(changes)
2
Our lastTransaction reflects the lastTransaction of the changes:
>>> storage.lastTransaction() > base.lastTransaction()
True
>>> storage.lastTransaction() == changes.lastTransaction()
True
Let's walk over some of the methods so ewe can see how we delegate to
the new underlying storages:
>>> from ZODB.utils import p64, u64
>>> storage.load(p64(0), '') == changes.load(p64(0), '')
True
>>> storage.load(p64(0), '') == base.load(p64(0), '')
False
>>> storage.load(p64(1), '') == base.load(p64(1), '')
True
>>> serial = base.getTid(p64(0))
>>> storage.loadSerial(p64(0), serial) == base.loadSerial(p64(0), serial)
True
>>> serial = changes.getTid(p64(0))
>>> storage.loadSerial(p64(0), serial) == changes.loadSerial(p64(0),
... serial)
True
The object id of the new object is quite random, and typically large:
>>> print u64(conn.root()['2']._p_oid)
7106521602475165646
Let's look at some other methods:
>>> storage.getName()
"DemoStorage('base.fs', 'changes.fs')"
>>> storage.sortKey() == changes.sortKey()
True
>>> storage.getSize() == changes.getSize()
True
>>> len(storage) == len(changes)
True
Undo methods are simply copied from the changes storage:
>>> [getattr(storage, name) == getattr(changes, name)
... for name in ('supportsUndo', 'undo', 'undoLog', 'undoInfo')
... ]
[True, True, True, True]
...@@ -174,9 +174,9 @@ ...@@ -174,9 +174,9 @@
<sectiontype name="demostorage" datatype=".DemoStorage" <sectiontype name="demostorage" datatype=".DemoStorage"
implements="ZODB.storage"> implements="ZODB.storage">
<key name="name" default="Demo Storage"/> <key name="name" />
<section type="ZODB.storage" name="*" attribute="base"/> <section type="ZODB.storage" name="*" attribute="base" />
<key name="quota" datatype="integer"/> <section type="ZODB.storage" name="changes" attribute="changes" />
</sectiontype> </sectiontype>
......
...@@ -125,9 +125,11 @@ class DemoStorage(BaseConfig): ...@@ -125,9 +125,11 @@ class DemoStorage(BaseConfig):
base = self.config.base.open() base = self.config.base.open()
else: else:
base = None base = None
return DemoStorage(self.config.name, if self.config.changes:
base=base, changes = self.config.changes.open()
quota=self.config.quota) else:
changes = None
return DemoStorage(self.config.name, base=base, changes=changes)
class FileStorage(BaseConfig): class FileStorage(BaseConfig):
......
...@@ -12,18 +12,36 @@ ...@@ -12,18 +12,36 @@
# #
############################################################################## ##############################################################################
import unittest import unittest
import random
import transaction import transaction
from ZODB.DB import DB from ZODB.DB import DB
from zope.testing import doctest
import zope.testing.setupstack
import ZODB.utils import ZODB.utils
import ZODB.DemoStorage import ZODB.DemoStorage
from ZODB.tests import StorageTestBase, BasicStorage from ZODB.tests import (
from ZODB.tests import Synchronization BasicStorage,
HistoryStorage,
class DemoStorageTests(StorageTestBase.StorageTestBase, IteratorStorage,
BasicStorage.BasicStorage, MTStorage,
Synchronization.SynchronizedStorage, PackableStorage,
): RevisionStorage,
StorageTestBase,
Synchronization,
)
class DemoStorageTests(
StorageTestBase.StorageTestBase,
BasicStorage.BasicStorage,
HistoryStorage.HistoryStorage,
IteratorStorage.ExtendedIteratorStorage,
IteratorStorage.IteratorStorage,
MTStorage.MTStorage,
PackableStorage.PackableStorage,
RevisionStorage.RevisionStorage,
Synchronization.SynchronizedStorage,
):
def setUp(self): def setUp(self):
self._storage = ZODB.DemoStorage.DemoStorage() self._storage = ZODB.DemoStorage.DemoStorage()
...@@ -44,7 +62,26 @@ class DemoStorageTests(StorageTestBase.StorageTestBase, ...@@ -44,7 +62,26 @@ class DemoStorageTests(StorageTestBase.StorageTestBase,
self.assertEqual(s2.load(ZODB.utils.z64, ''), self.assertEqual(s2.load(ZODB.utils.z64, ''),
self._storage.load(ZODB.utils.z64, '')) self._storage.load(ZODB.utils.z64, ''))
def checkLengthAndBool(self):
self.assertEqual(len(self._storage), 0)
self.assert_(not self._storage)
db = DB(self._storage) # creates object 0. :)
self.assertEqual(len(self._storage), 1)
self.assert_(self._storage)
conn = db.open()
for i in range(10):
conn.root()[i] = conn.root().__class__()
transaction.commit()
self.assertEqual(len(self._storage), 11)
self.assert_(self._storage)
def checkLoadBeforeUndo(self):
pass # we don't support undo yet
checkUndoZombie = checkLoadBeforeUndo
def checkPackWithMultiDatabaseReferences(self):
pass # we never do gc
class DemoStorageWrappedBase(DemoStorageTests): class DemoStorageWrappedBase(DemoStorageTests):
def setUp(self): def setUp(self):
...@@ -59,29 +96,92 @@ class DemoStorageWrappedBase(DemoStorageTests): ...@@ -59,29 +96,92 @@ class DemoStorageWrappedBase(DemoStorageTests):
def _makeBaseStorage(self): def _makeBaseStorage(self):
raise NotImplementedError raise NotImplementedError
class DemoStorageWrappedAroundFileStorage(DemoStorageWrappedBase): class DemoStorageWrappedAroundMappingStorage(DemoStorageWrappedBase):
def _makeBaseStorage(self): def _makeBaseStorage(self):
from ZODB.MappingStorage import MappingStorage from ZODB.MappingStorage import MappingStorage
return MappingStorage() return MappingStorage()
class DemoStorageWrappedAroundMappingStorage(DemoStorageWrappedBase): class DemoStorageWrappedAroundFileStorage(DemoStorageWrappedBase):
def _makeBaseStorage(self): def _makeBaseStorage(self):
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
return FileStorage('FileStorageTests.fs') return FileStorage('FileStorageTests.fs')
def setUp(test):
random.seed(0)
zope.testing.setupstack.setUpDirectory(test)
zope.testing.setupstack.register(test, transaction.abort)
def testSomeDelegation():
r"""
>>> class S:
... def __init__(self, name):
... self.name = name
... def registerDB(self, db):
... print self.name, db
... def close(self):
... print self.name, 'closed'
... sortKey = getSize = __len__ = history = getTid = None
... tpc_finish = tpc_vote = tpc_transaction = None
... _lock_acquire = _lock_release = lambda: None
... getName = lambda self: 'S'
... isReadOnly = tpc_transaction = None
... supportsUndo = undo = undoLog = undoInfo = None
... supportsTransactionalUndo = None
... def new_oid(self):
... return '\0' * 8
... def tpc_begin(self, t, tid, status):
... print 'begin', tid, status
... def tpc_abort(self, t):
... pass
>>> from ZODB.DemoStorage import DemoStorage
>>> storage = DemoStorage(base=S(1), changes=S(2))
>>> storage.registerDB(1)
2 1
>>> storage.close()
1 closed
2 closed
>>> storage.tpc_begin(1, 2, 3)
begin 2 3
>>> storage.tpc_abort(1)
"""
def test_suite():
return unittest.TestSuite((
doctest.DocFileSuite('synchronized.txt'),
doctest.DocTestSuite(
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
),
doctest.DocFileSuite(
'README.txt',
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
),
))
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite((
doctest.DocTestSuite(
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
),
doctest.DocFileSuite(
'../DemoStorage.test',
setUp=setUp, tearDown=zope.testing.setupstack.tearDown,
),
))
suite.addTest(unittest.makeSuite(DemoStorageTests, 'check')) suite.addTest(unittest.makeSuite(DemoStorageTests, 'check'))
suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundFileStorage, suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundFileStorage,
'check')) 'check'))
suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundMappingStorage, suite.addTest(unittest.makeSuite(DemoStorageWrappedAroundMappingStorage,
'check')) 'check'))
return suite return suite
if __name__ == "__main__":
loader = unittest.TestLoader()
loader.testMethodPrefix = "check"
unittest.main(testLoader=loader)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment