Commit 0e267880 authored by Jim Fulton's avatar Jim Fulton

A new storage interface, IExternalGC, to support external garbage

collection, http://wiki.zope.org/ZODB/ExternalGC, has been defined
and implemented for FileStorage.
parent 95563b19
......@@ -35,6 +35,13 @@ New Features
- POSKeyErrors are no longer logged by ZEO servers, because they are
really client errors.
- A new storage interface, IExternalGC, to support external garbage
collection, http://wiki.zope.org/ZODB/ExternalGC, has been defined
and implemented for FileStorage.
- As a small convenience (mainly for tests), you can now specify
initial data as a string argument to the Blob constructor.
3.9.0a8 (2008-12-15)
====================
......
......@@ -425,11 +425,13 @@ class FileStorage(
if h.plen:
data = self._file.read(h.plen)
return data, h.tid
else:
elif h.back:
# Get the data from the backpointer, but tid from
# current txn.
data = self._loadBack_impl(oid, h.back)[0]
return data, h.tid
else:
raise POSKeyError(oid)
finally:
self._lock_release()
......@@ -524,6 +526,41 @@ class FileStorage(
finally:
self._lock_release()
def deleteObject(self, oid, oldserial, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
old = self._index_get(oid, 0)
if not old:
raise POSException.POSKeyError(oid)
h = self._read_data_header(old, oid)
committed_tid = h.tid
if oldserial != committed_tid:
raise POSException.ConflictError(
oid=oid, serials=(committed_tid, oldserial))
pos = self._pos
here = pos + self._tfile.tell() + self._thl
self._tindex[oid] = here
new = DataHeader(oid, self._tid, old, pos, 0, 0)
self._tfile.write(new.asString())
self._tfile.write(z64)
# Check quota
if self._quota is not None and here > self._quota:
raise FileStorageQuotaError(
"The storage quota has been exceeded.")
return self._tid
finally:
self._lock_release()
def _data_find(self, tpos, oid, data):
# Return backpointer for oid. Must call with the lock held.
# This is a file offset to oid's data record if found, else 0.
......
......@@ -969,6 +969,27 @@ class IStorageCurrentRecordIteration(IStorage):
"""
class IExternalGC(IStorage):
def deleteObject(oid, serial, transaction):
"""Mark an object as deleted
This method marks an object as deleted via a new object
revision. Subsequent attempts to load current data for the
object will fail with a POSKeyError, but loads for
non-current data will suceed if there are previous
non-delete records. The object will be removed from the
storage when all not-delete records are removed.
The the storage's transaction id for the current transaction is
returned.
The serial argument must match the most recently committed
serial for the object. This is a seat belt.
This method can only be called in the first phase of 2-phase
commit.
"""
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
......
Storage Support for external GC
===============================
A storage that provides IExternalGC supports external garbage
collectors by providing a deleteObject method that transactionally
deletes an object.
A create_storage function is provided that creates a storage.
>>> storage = create_storage()
>>> import ZODB.blob, transaction
>>> db = ZODB.DB(storage)
>>> conn = db.open()
>>> conn.root()[0] = conn.root().__class__()
>>> conn.root()[1] = ZODB.blob.Blob('some data')
>>> transaction.commit()
>>> oid0 = conn.root()[0]._p_oid
>>> oid1 = conn.root()[1]._p_oid
>>> del conn.root()[0]
>>> del conn.root()[1]
>>> transaction.commit()
At this point, object 0 and 1 is garbage, but it's still in the storage:
>>> p0, s0 = storage.load(oid0, '')
>>> p1, s1 = storage.load(oid1, '')
Now we'll use the new deleteObject API to delete the objects. We can't
go through the database to do this, so we'll have to manage the
transaction ourselves.
>>> txn = transaction.begin()
>>> storage.tpc_begin(txn)
>>> tid = storage.deleteObject(oid0, s0, txn)
>>> tid = storage.deleteObject(oid1, s1, txn)
>>> storage.tpc_vote(txn)
>>> storage.tpc_finish(txn)
Now if we try to load data for the objects, we get a POSKeyError:
>>> storage.load(oid0, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.load(oid1, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
We can still get the data if we load before the time we deleted.
>>> storage.loadBefore(oid0, conn.root()._p_serial) == (p0, s0, tid)
True
>>> storage.loadBefore(oid1, conn.root()._p_serial) == (p1, s1, tid)
True
>>> open(storage.loadBlob(oid1, s1)).read()
'some data'
If we pack, however, the old data will be removed and the data will be
gone:
>>> import time
>>> db.pack(time.time()+1)
>>> storage.load(oid0, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.load(oid1, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.loadBefore(oid0, conn.root()._p_serial) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.loadBefore(oid1, conn.root()._p_serial) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> storage.loadBlob(oid1, s1) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
A conflict error is raised if the serial we provide to deleteObject
isn't current:
>>> conn.root()[0] = conn.root().__class__()
>>> transaction.commit()
>>> oid = conn.root()[0]._p_oid
>>> bad_serial = conn.root()[0]._p_serial
>>> conn.root()[0].x = 1
>>> transaction.commit()
>>> txn = transaction.begin()
>>> storage.tpc_begin(txn)
>>> storage.deleteObject(oid, bad_serial, txn) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ConflictError: database conflict error ...
>>> storage.tpc_abort(txn)
>>> storage.close()
......@@ -13,24 +13,22 @@
##############################################################################
"""Run some tests relevant for storages that support pack()."""
import cPickle
from cStringIO import StringIO
import time
from persistent import Persistent
from persistent.mapping import PersistentMapping
import transaction
import ZODB.interfaces
from ZODB import DB
from ZODB.POSException import ConflictError, StorageError
from ZODB.serialize import referencesf
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import snooze
from ZODB.POSException import ConflictError, StorageError
from ZODB.tests.MTStorage import TestThread
from ZODB.tests.StorageTestBase import snooze
from zope.testing import doctest
import cPickle
import time
import transaction
import ZODB.interfaces
import ZODB.tests.util
import zope.testing.setupstack
ZERO = '\0'*8
......@@ -750,3 +748,17 @@ class ElapsedTimer:
def elapsed_millis(self):
return int((time.time() - self.start_time) * 1000)
def IExternalGC_suite(factory):
"""Return a test suite for a generic .
Pass a factory taking a name and a blob directory name.
"""
def setup(test):
ZODB.tests.util.setUp(test)
test.globs['create_storage'] = factory
return doctest.DocFileSuite(
'IExternalGC.test',
setUp=setup, tearDown=zope.testing.setupstack.tearDown)
......@@ -581,7 +581,6 @@ def pack_with_open_blob_files():
>>> db.close()
"""
def test_suite():
from zope.testing import doctest
......@@ -600,6 +599,8 @@ def test_suite():
test_blob_storage_recovery=True,
test_packing=True,
))
suite.addTest(PackableStorage.IExternalGC_suite(
lambda : ZODB.FileStorage.FileStorage('data.fs', blob_dir='blobs')))
return suite
if __name__=='__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment