Commit b783cfc0 authored by Michael Kerrin's avatar Michael Kerrin

Fix bug in transaction abort which results in uncommitted data building

up on the system. Also add some tests for the configuration of BlobStorage.
parent 3005e976
......@@ -2,7 +2,6 @@
import os
import time
import tempfile
import weakref
from zope.interface import implements
......@@ -22,6 +21,11 @@ class Blob(Persistent):
_p_blob_uncommitted = None
_p_blob_data = None
# All persistent object store a reference to their data manager, a database
# connection in the _p_jar attribute. So we are going to do the same with
# blobs here.
_p_blob_manager = None
def open(self, mode="r"):
""" Returns a file(-like) object representing blob data. This
method will either return the file object, raise a BlobError
......@@ -86,8 +90,14 @@ class Blob(Persistent):
# counts that keep track of open readers and writers and
# close any writable filehandles we've opened.
dm = BlobDataManager(self, result)
transaction.get().register(dm)
if self._p_blob_manager is None:
dm = BlobDataManager(self, result)
transaction.get().register(dm)
else:
# each blob data manager should manage only the one blob
# assert that this is the case and it is the correct blob
assert self._p_blob_manager.blob is self
self._p_blob_manager.register_fh(result)
return result
......@@ -144,10 +154,14 @@ class BlobDataManager:
# we keep a weakref to the file handle because we don't want to
# keep it alive if all other references to it die (e.g. in the
# case it's opened without assigning it to a name).
self.fhref = weakref.ref(filehandle)
self.fhrefs = utils.WeakSet()
self.register_fh(filehandle)
self.subtransaction = False
self.sortkey = time.time()
def register_fh(self, filehandle):
self.fhrefs.add(filehandle)
def abort_sub(self, transaction):
pass
......@@ -169,16 +183,15 @@ class BlobDataManager:
def commit(self, object, transaction):
if not self.subtransaction:
self.blob._p_blob_clear() # clear all blob refcounts
filehandle = self.fhref()
if filehandle is not None:
filehandle.close()
self.fhrefs.map(lambda fhref: fhref.close())
def abort(self, object, transaction):
if not self.subtransaction:
self.blob._p_blob_clear()
filehandle = self.fhref()
if filehandle is not None:
filehandle.close()
self.fhrefs.map(lambda fhref: fhref.close())
if self.blob._p_blob_uncommitted is not None and \
os.path.exists(self.blob._p_blob_uncommitted):
os.unlink(self.blob._p_blob_uncommitted)
def sortKey(self):
return self.sortkey
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import tempfile, shutil, unittest
from ZODB.tests.testConfig import ConfigTestBase
from ZConfig import ConfigurationSyntaxError
class BlobConfigTestBase(ConfigTestBase):
def setUp(self):
super(BlobConfigTestBase, self).setUp()
self.blob_dir = tempfile.mkdtemp()
def tearDown(self):
super(BlobConfigTestBase, self).tearDown()
shutil.rmtree(self.blob_dir)
class ZODBBlobConfigTest(BlobConfigTestBase):
def test_map_config1(self):
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<mappingstorage/>
</blobstorage>
</zodb>
""" % self.blob_dir)
def test_file_config1(self):
path = tempfile.mktemp()
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<filestorage>
path %s
</filestorage>
</blobstorage>
</zodb>
""" %(self.blob_dir, path))
def test_blod_dir_needed(self):
self.assertRaises(ConfigurationSyntaxError,
self._test,
"""
<zodb>
<blobstorage>
<mappingstorage/>
</blobstorage>
</zodb>
""")
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZODBBlobConfigTest))
return suite
if __name__ == '__main__':
unittest.main(defaultTest = 'test_suite')
......@@ -109,6 +109,8 @@ aborted above, so the object should be "clean" when we start):
>>> bool(blob1a._p_changed)
False
>>> blob1a.open('r').read()
'this is blob 1'
>>> blob1afh3 = blob1a.open('a')
>>> blob1afh3.write('woot!')
>>> blob1a._p_blob_refcounts()
......@@ -125,6 +127,8 @@ transaction:
>>> transaction.commit()
>>> blob2._p_blob_refcounts()
(0, 0)
>>> blob1._p_blob_refcounts()
(0, 0)
Since we committed the current transaction above, the aggregate
changes we've made to blob, blob1a (these refer to the same object) and
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment