Commit df18ff7f authored by Jim Fulton's avatar Jim Fulton

Added IExternalGC support to ClientStorage.

parent ee626d10
...@@ -37,7 +37,7 @@ New Features ...@@ -37,7 +37,7 @@ New Features
- A new storage interface, IExternalGC, to support external garbage - A new storage interface, IExternalGC, to support external garbage
collection, http://wiki.zope.org/ZODB/ExternalGC, has been defined collection, http://wiki.zope.org/ZODB/ExternalGC, has been defined
and implemented for FileStorage. and implemented for FileStorage and ClientStorage.
- As a small convenience (mainly for tests), you can now specify - As a small convenience (mainly for tests), you can now specify
initial data as a string argument to the Blob constructor. initial data as a string argument to the Blob constructor.
......
...@@ -634,6 +634,7 @@ class ClientStorage(object): ...@@ -634,6 +634,7 @@ class ClientStorage(object):
ZODB.interfaces.IStorageUndoable, ZODB.interfaces.IStorageUndoable,
ZODB.interfaces.IStorageCurrentRecordIteration, ZODB.interfaces.IStorageCurrentRecordIteration,
ZODB.interfaces.IBlobStorage, ZODB.interfaces.IBlobStorage,
ZODB.interfaces.IExternalGC,
): ):
if (iface.__module__, iface.__name__) in self._info.get( if (iface.__module__, iface.__name__) in self._info.get(
'interfaces', ()): 'interfaces', ()):
...@@ -969,6 +970,11 @@ class ClientStorage(object): ...@@ -969,6 +970,11 @@ class ClientStorage(object):
os.rename(blob_filename+'.dl', blob_filename) os.rename(blob_filename+'.dl', blob_filename)
os.chmod(blob_filename, stat.S_IREAD) os.chmod(blob_filename, stat.S_IREAD)
def deleteObject(self, oid, serial, txn):
self._check_trans(txn)
self._server.deleteObject(oid, serial, id(txn))
self._tbuf.store(oid, None)
def loadBlob(self, oid, serial): def loadBlob(self, oid, serial):
# Load a blob. If it isn't present and we have a shared blob # Load a blob. If it isn't present and we have a shared blob
# directory, then assume that it doesn't exist on the server # directory, then assume that it doesn't exist on the server
......
...@@ -37,6 +37,10 @@ class CommitLog: ...@@ -37,6 +37,10 @@ class CommitLog:
def size(self): def size(self):
return self.file.tell() return self.file.tell()
def delete(self, oid, serial):
self.pickler.dump(('d', oid, serial))
self.stores += 1
def store(self, oid, serial, data): def store(self, oid, serial, data):
self.pickler.dump(('s', oid, serial, data)) self.pickler.dump(('s', oid, serial, data))
self.stores += 1 self.stores += 1
......
...@@ -225,6 +225,9 @@ class StorageServer: ...@@ -225,6 +225,9 @@ class StorageServer:
def storeBlobShared(self, oid, serial, data, filename, id): def storeBlobShared(self, oid, serial, data, filename, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, id) self.rpc.callAsync('storeBlobShared', oid, serial, data, filename, id)
def deleteObject(self, oid, serial, id):
self.rpc.callAsync('deleteObject', oid, serial, id)
## ##
# Start two-phase commit for a transaction # Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The # @param id id used by client to identify current transaction. The
......
...@@ -468,6 +468,11 @@ class ZEOStorage: ...@@ -468,6 +468,11 @@ class ZEOStorage:
# Most of the real implementations are in methods beginning with # Most of the real implementations are in methods beginning with
# an _. # an _.
def deleteObject(self, oid, serial, id):
self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1
self.txnlog.delete(oid, serial)
def storea(self, oid, serial, data, id): def storea(self, oid, serial, data, id):
self._check_tid(id, exc=StorageTransactionError) self._check_tid(id, exc=StorageTransactionError)
self.stats.stores += 1 self.stats.stores += 1
...@@ -518,6 +523,30 @@ class ZEOStorage: ...@@ -518,6 +523,30 @@ class ZEOStorage:
else: else:
return self._wait(lambda: self._undo(trans_id)) return self._wait(lambda: self._undo(trans_id))
def _delete(self, oid, serial):
err = None
try:
self.storage.deleteObject(oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, err:
self.store_failed = 1
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error oid=%s msg=%s" %
(oid_repr(oid), str(err)), BLATHER)
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2],
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
self.serials.append((oid, err))
else:
self.invalidated.append(oid)
return err is None
def _store(self, oid, serial, data): def _store(self, oid, serial, data):
err = None err = None
try: try:
...@@ -652,7 +681,9 @@ class ZEOStorage: ...@@ -652,7 +681,9 @@ class ZEOStorage:
store_type = store[0] store_type = store[0]
store_args = store[1:] store_args = store[1:]
if store_type == 's': if store_type == 'd':
do_store = self._delete
elif store_type == 's':
do_store = self._store do_store = self._store
elif store_type == 'r': elif store_type == 'r':
do_store = self._restore do_store = self._restore
......
...@@ -87,7 +87,7 @@ class TransactionBuffer: ...@@ -87,7 +87,7 @@ class TransactionBuffer:
self.pickler.dump((oid, data)) self.pickler.dump((oid, data))
self.count += 1 self.count += 1
# Estimate per-record cache size # Estimate per-record cache size
self.size = self.size + len(data) + 31 self.size = self.size + (data and len(data) or 0) + 31
finally: finally:
self.lock.release() self.lock.release()
......
...@@ -342,6 +342,7 @@ class FileStorageTests(FullGenericTests): ...@@ -342,6 +342,7 @@ class FileStorageTests(FullGenericTests):
('ZODB.interfaces', 'IStorageIteration'), ('ZODB.interfaces', 'IStorageIteration'),
('ZODB.interfaces', 'IStorageUndoable'), ('ZODB.interfaces', 'IStorageUndoable'),
('ZODB.interfaces', 'IStorageCurrentRecordIteration'), ('ZODB.interfaces', 'IStorageCurrentRecordIteration'),
('ZODB.interfaces', 'IExternalGC'),
('ZODB.interfaces', 'IStorage'), ('ZODB.interfaces', 'IStorage'),
('zope.interface', 'Interface'), ('zope.interface', 'Interface'),
), ),
...@@ -1128,7 +1129,6 @@ def client_has_newer_data_than_server(): ...@@ -1128,7 +1129,6 @@ def client_has_newer_data_than_server():
def history_over_zeo(): def history_over_zeo():
""" """
>>> addr, _ = start_server() >>> addr, _ = start_server()
>>> import ZEO, ZODB.blob, transaction
>>> db = ZEO.DB(addr) >>> db = ZEO.DB(addr)
>>> wait_connected(db.storage) >>> wait_connected(db.storage)
>>> conn = db.open() >>> conn = db.open()
...@@ -1143,8 +1143,7 @@ def history_over_zeo(): ...@@ -1143,8 +1143,7 @@ def history_over_zeo():
def dont_log_poskeyerrors_on_server(): def dont_log_poskeyerrors_on_server():
""" """
>>> addr, admin = start_server() >>> addr, admin = start_server()
>>> import ZEO.ClientStorage >>> cs = ClientStorage(addr)
>>> cs = ZEO.ClientStorage.ClientStorage(addr)
>>> cs.load(ZODB.utils.p64(1)) >>> cs.load(ZODB.utils.p64(1))
Traceback (most recent call last): Traceback (most recent call last):
... ...
...@@ -1156,6 +1155,52 @@ def dont_log_poskeyerrors_on_server(): ...@@ -1156,6 +1155,52 @@ def dont_log_poskeyerrors_on_server():
False False
""" """
def delete_object_multiple_clients():
"""If we delete on one client, the delete should be reflected on the other.
First, we'll create an object:
>>> addr, _ = start_server()
>>> db = ZEO.DB(addr)
>>> conn = db.open()
>>> conn.root()[0] = conn.root().__class__()
>>> transaction.commit()
>>> oid = conn.root()[0]._p_oid
We verify that we can read it in another client, which also loads
it into the client cache.
>>> cs = ClientStorage(addr)
>>> p, s = cs.load(oid)
Now, we'll remove the object:
>>> txn = transaction.begin()
>>> db.storage.tpc_begin(txn)
>>> db.storage.deleteObject(oid, s, txn)
>>> db.storage.tpc_vote(txn)
>>> db.storage.tpc_finish(txn)
And we'll get a POSKeyError if we try to access it:
>>> db.storage.load(oid) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
We'll wait for our other storage to get the invalidation and then
try to access the object. We'll get a POSKeyError there too:
>>> tid = db.storage.lastTransaction()
>>> forker.wait_until(lambda : cs.lastTransaction() == tid)
>>> cs.load(oid) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
POSKeyError: ...
>>> db.close()
>>> cs.close()
"""
slow_test_classes = [ slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests, BlobAdaptedFileStorageTests, BlobWritableCacheTests,
...@@ -1207,6 +1252,16 @@ class ServerManagingClientStorage(ClientStorage): ...@@ -1207,6 +1252,16 @@ class ServerManagingClientStorage(ClientStorage):
def create_storage_shared(name, blob_dir): def create_storage_shared(name, blob_dir):
return ServerManagingClientStorage(name, blob_dir, True) return ServerManagingClientStorage(name, blob_dir, True)
class ServerManagingClientStorageForIExternalGCTest(
ServerManagingClientStorage):
def pack(self, t=None, referencesf=None):
ServerManagingClientStorage.pack(self, t, referencesf, wait=True)
# Packing doesn't clear old versions out of zeo client caches,
# so we'll clear the caches.
self._cache.clear()
ZEO.ClientStorage._check_blob_cache_size(self.blob_dir, 0)
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
...@@ -1227,6 +1282,10 @@ def test_suite(): ...@@ -1227,6 +1282,10 @@ def test_suite():
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
), ),
) )
zeo.addTest(PackableStorage.IExternalGC_suite(
lambda :
ServerManagingClientStorageForIExternalGCTest('data.fs', 'blobs')
))
for klass in quick_test_classes: for klass in quick_test_classes:
zeo.addTest(unittest.makeSuite(klass, "check")) zeo.addTest(unittest.makeSuite(klass, "check"))
zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc') zeo.layer = ZODB.tests.util.MininalTestLayer('testZeo-misc')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment