Commit 643e267f authored by Jim Fulton's avatar Jim Fulton

Made a number of blob changes:

- Unwritten blobs can now be read, and are empty.

- Blobs are considered modified when opened for writing.  This is a
   little bit more conservative than before but fixes a bug that a file
   opened with 'w' actually does modify the file and wasn't considered
   to be a change before.

- Optimistic savepoints now work.

- Fixed bug: could open multiple files for writing.

- Fixed bug: aborting a transaction removed uncommitted data for
   uncommitted blobs.

Todo: 
   Need to remove uncommitted data file if a blob is GCed even when a
   transaction isn't aborted or when it hasn't been added to anything.

- No-longer close files on transaction boundaries.

This allows us to get rid of the transaction-manager dance.
parent 3ccbf36d
...@@ -16,12 +16,14 @@ ...@@ -16,12 +16,14 @@
import base64 import base64
import logging import logging
import logging
import os import os
import shutil import shutil
import sys import sys
import time
import tempfile import tempfile
import logging import threading
import time
import weakref
import zope.interface import zope.interface
...@@ -42,78 +44,123 @@ BLOB_SUFFIX = ".blob" ...@@ -42,78 +44,123 @@ BLOB_SUFFIX = ".blob"
valid_modes = 'r', 'w', 'r+', 'a' valid_modes = 'r', 'w', 'r+', 'a'
# Threading issues:
# We want to support closing blob files when they are destroyed.
# This introduces a threading issue, since a blob file may be destroyed
# via GC in any thread.
class Blob(persistent.Persistent): class Blob(persistent.Persistent):
"""A BLOB supports efficient handling of large data within ZODB.""" """A BLOB supports efficient handling of large data within ZODB."""
zope.interface.implements(ZODB.interfaces.IBlob) zope.interface.implements(ZODB.interfaces.IBlob)
_os_link = os.rename
_p_blob_readers = 0
_p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data _p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
_p_blob_data = None # Filename of the committed data _p_blob_committed = None # Filename of the committed data
# All persistent object store a reference to their data manager, a database readers = writers = None
# connection in the _p_jar attribute. So we are going to do the same with def __setstate__(self, state=None):
# blobs here. # We use lists here because it will allow is to add and remove
_p_blob_manager = None # atomically
self.readers = []
# Blobs need to participate in transactions even when not connected to self.writers = []
# a database yet. If you want to use a non-default transaction manager,
# you can override it via _p_blob_transaction. This is currently __init__ = __setstate__
# required for unit testing.
_p_blob_transaction = None def __getstate__(self):
return None
def _p_deactivate(self):
# Only ghostify if we are unopened.
if self.readers or self.writers:
return
super(Blob, self)._p_deactivate()
def _p_invalidate(self):
# Force-close any open readers or writers,
# XXX should we warn of this? Maybe?
if self._p_changed is None:
return
for ref in self.readers+self.writers:
f = ref()
if f is not None:
f.close()
if (self._p_blob_uncommitted
and os.path.exists(self._p_blob_uncommitted)
):
os.remove(self._p_blob_uncommitted)
super(Blob, self)._p_invalidate()
@property
def opened(self):
return bool(self.readers or self.writers)
def closed(self, f):
# We use try/except below because another thread might remove
# the ref after we check it if the file is GCed.
for file_refs in (self.readers, self.writers):
for ref in file_refs:
if ref() is f:
try:
file_refs.remove(ref)
except ValueError:
pass
return
def open(self, mode="r"): def open(self, mode="r"):
"""Returns a file(-like) object representing blob data."""
result = None
if mode not in valid_modes: if mode not in valid_modes:
raise ValueError("invalid mode", mode) raise ValueError("invalid mode", mode)
if self.writers:
raise BlobError("Already opened for writing.")
if mode == 'r': if mode == 'r':
if self._current_filename() is None: if self._current_filename() is None:
raise BlobError("Blob does not exist.") self._create_uncommitted_file()
if self._p_blob_writers != 0:
raise BlobError("Already opened for writing.")
self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self) result = BlobFile(self._current_filename(), mode, self)
elif mode == 'w': def destroyed(ref, readers=self.readers):
if self._p_blob_readers != 0: try:
raise BlobError("Already opened for reading.") readers.remove(ref)
except ValueError:
self._p_blob_writers += 1 pass
if self._p_blob_uncommitted is None:
self._create_uncommitted_file() self.readers.append(weakref.ref(result, destroyed))
result = BlobFile(self._p_blob_uncommitted, mode, self) else:
if self.readers:
elif mode in ('a', 'r+'):
if self._p_blob_readers != 0:
raise BlobError("Already opened for reading.") raise BlobError("Already opened for reading.")
if self._p_blob_uncommitted is None: if mode == 'w':
# Create a new working copy if self._p_blob_uncommitted is None:
uncommitted = BlobFile(self._create_uncommitted_file(), self._create_uncommitted_file()
mode, self) result = BlobFile(self._p_blob_uncommitted, mode, self)
# NOTE: _p_blob data appears by virtue of Connection._setstate
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
else: else:
# Re-use existing working copy if self._p_blob_uncommitted is None:
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self) # Create a new working copy
self._create_uncommitted_file()
result = BlobFile(self._p_blob_uncommitted, mode, self)
utils.cp(file(self._p_blob_committed), result)
if mode == 'r+':
result.seek(0)
else:
# Re-use existing working copy
result = BlobFile(self._p_blob_uncommitted, mode, self)
self._p_blob_writers += 1 def destroyed(ref, writers=self.writers):
result = uncommitted try:
writers.remove(ref)
except ValueError:
pass
self.writers.append(weakref.ref(result, destroyed))
else: self._p_changed = True
raise IOError('invalid mode: %s ' % mode)
if result is not None:
self._setup_transaction_manager(result)
return result return result
def openDetached(self, class_=file): def openDetached(self, class_=file):
...@@ -123,7 +170,7 @@ class Blob(persistent.Persistent): ...@@ -123,7 +170,7 @@ class Blob(persistent.Persistent):
""" """
if self._current_filename() is None: if self._current_filename() is None:
raise BlobError("Blob does not exist.") raise BlobError("Blob does not exist.")
if self._p_blob_writers != 0: if self.writers:
raise BlobError("Already opened for writing.") raise BlobError("Already opened for writing.")
# XXX this should increase the reader number and have a test !?! # XXX this should increase the reader number and have a test !?!
return class_(self._current_filename(), "rb") return class_(self._current_filename(), "rb")
...@@ -132,9 +179,9 @@ class Blob(persistent.Persistent): ...@@ -132,9 +179,9 @@ class Blob(persistent.Persistent):
"""Will replace the current data of the blob with the file given under """Will replace the current data of the blob with the file given under
filename. filename.
""" """
if self._p_blob_writers != 0: if self.writers:
raise BlobError("Already opened for writing.") raise BlobError("Already opened for writing.")
if self._p_blob_readers != 0: if self.readers:
raise BlobError("Already opened for reading.") raise BlobError("Already opened for reading.")
previous_uncommitted = bool(self._p_blob_uncommitted) previous_uncommitted = bool(self._p_blob_uncommitted)
...@@ -151,7 +198,7 @@ class Blob(persistent.Persistent): ...@@ -151,7 +198,7 @@ class Blob(persistent.Persistent):
os.unlink(target) os.unlink(target)
try: try:
self._os_link(filename, target) os.rename(filename, target)
except: except:
# Recover from the failed consumption: First remove the file, it # Recover from the failed consumption: First remove the file, it
# might exist and mark the pointer to the uncommitted file. # might exist and mark the pointer to the uncommitted file.
...@@ -175,14 +222,14 @@ class Blob(persistent.Persistent): ...@@ -175,14 +222,14 @@ class Blob(persistent.Persistent):
# We changed the blob state and have to make sure we join the # We changed the blob state and have to make sure we join the
# transaction. # transaction.
self._change() self._p_changed = True
# utility methods # utility methods
def _current_filename(self): def _current_filename(self):
# NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
# Connection._setstate # Connection._setstate
return self._p_blob_uncommitted or self._p_blob_data return self._p_blob_uncommitted or self._p_blob_committed
def _create_uncommitted_file(self): def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, ( assert self._p_blob_uncommitted is None, (
...@@ -191,148 +238,6 @@ class Blob(persistent.Persistent): ...@@ -191,148 +238,6 @@ class Blob(persistent.Persistent):
self._p_blob_uncommitted = utils.mktemp(dir=tempdir) self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted return self._p_blob_uncommitted
def _change(self):
self._p_changed = 1
def _setup_transaction_manager(self, result):
# We join the transaction with our own data manager in order to be
# notified of commit/vote/abort events. We do this because at
# transaction boundaries, we need to fix up _p_ reference counts
# that keep track of open readers and writers and close any
# writable filehandles we've opened.
if self._p_blob_manager is None:
# Blobs need to always participate in transactions.
if self._p_jar is not None:
# If we are connected to a database, then we use the
# transaction manager that belongs to this connection
tm = self._p_jar.transaction_manager
else:
# If we are not connected to a database, we check whether
# we have been given an explicit transaction manager
if self._p_blob_transaction:
tm = self._p_blob_transaction
else:
# Otherwise we use the default
# transaction manager as an educated guess.
tm = transaction.manager
# Create our datamanager and join he current transaction.
dm = BlobDataManager(self, result, tm)
tm.get().join(dm)
elif result:
# Each blob data manager should manage only the one blob
# assigned to it. Assert that this is the case and it is the
# correct blob
assert self._p_blob_manager.blob is self
self._p_blob_manager.register_fh(result)
# utility methods which should not cause the object's state to be
# loaded if they are called while the object is a ghost. Thus,
# they are named with the _p_ convention and only operate against
# other _p_ instance attributes. We conventionally name these methods
# and attributes with a _p_blob prefix.
def _p_blob_clear(self):
self._p_blob_readers = 0
self._p_blob_writers = 0
def _p_blob_decref(self, mode):
if mode == 'r':
self._p_blob_readers = max(0, self._p_blob_readers - 1)
else:
assert mode in valid_modes, "Invalid mode %r" % mode
self._p_blob_writers = max(0, self._p_blob_writers - 1)
def _p_blob_refcounts(self):
# used by unit tests
return self._p_blob_readers, self._p_blob_writers
class BlobDataManager:
"""Special data manager to handle transaction boundaries for blobs.
Blobs need some special care-taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_reader and _p_writer
refcount attributes must be set to a consistent state
b) the file objects might get passed out of the thread/transaction
and must deny any relationship to the original blob.
c) writable blob filehandles must be closed at the end of a txn so
as to not allow reuse between two transactions.
"""
zope.interface.implements(transaction.interfaces.IDataManager)
def __init__(self, blob, filehandle, tm):
self.blob = blob
self.transaction = tm.get()
# we keep a weakref to the file handle because we don't want to
# keep it alive if all other references to it die (e.g. in the
# case it's opened without assigning it to a name).
self.fhrefs = utils.WeakSet()
self.register_fh(filehandle)
self.sortkey = time.time()
self.prepared = False
# Blob specific methods
def register_fh(self, filehandle):
self.fhrefs.add(filehandle)
def _remove_uncommitted_data(self):
self.blob._p_blob_clear()
self.fhrefs.map(lambda fhref: fhref.close())
if (self.blob._p_blob_uncommitted is not None and
os.path.exists(self.blob._p_blob_uncommitted)):
os.unlink(self.blob._p_blob_uncommitted)
self.blob._p_blob_uncommitted = None
# IDataManager
def tpc_begin(self, transaction):
if self.prepared:
raise TypeError('Already prepared')
self._checkTransaction(transaction)
self.prepared = True
self.transaction = transaction
self.fhrefs.map(lambda fhref: fhref.close())
def commit(self, transaction):
if not self.prepared:
raise TypeError('Not prepared to commit')
self._checkTransaction(transaction)
self.transaction = None
self.prepared = False
self.blob._p_blob_clear()
def abort(self, transaction):
self.tpc_abort(transaction)
def tpc_abort(self, transaction):
self._checkTransaction(transaction)
if self.transaction is not None:
self.transaction = None
self.prepared = False
self._remove_uncommitted_data()
def tpc_finish(self, transaction):
pass
def tpc_vote(self, transaction):
pass
def sortKey(self):
return self.sortkey
def _checkTransaction(self, transaction):
if (self.transaction is not None and
self.transaction is not transaction):
raise TypeError("Transaction missmatch",
transaction, self.transaction)
class BlobFile(file): class BlobFile(file):
"""A BlobFile that holds a file handle to actual blob data. """A BlobFile that holds a file handle to actual blob data.
...@@ -349,35 +254,10 @@ class BlobFile(file): ...@@ -349,35 +254,10 @@ class BlobFile(file):
def __init__(self, name, mode, blob): def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode+'b') super(BlobFile, self).__init__(name, mode+'b')
self.blob = blob self.blob = blob
self.close_called = False
def write(self, data):
super(BlobFile, self).write(data)
self.blob._change()
def writelines(self, lines):
super(BlobFile, self).writelines(lines)
self.blob._change()
def truncate(self, size=0):
super(BlobFile, self).truncate(size)
self.blob._change()
def close(self): def close(self):
# we don't want to decref twice self.blob.closed(self)
if not self.close_called: file.close(self)
self.blob._p_blob_decref(self.mode[:-1])
self.close_called = True
super(BlobFile, self).close()
def __del__(self):
# XXX we need to ensure that the file is closed at object
# expiration or our blob's refcount won't be decremented.
# This probably needs some work; I don't know if the names
# 'BlobFile' or 'super' will be available at program exit, but
# we'll assume they will be for now in the name of not
# muddying the code needlessly.
self.close()
_pid = str(os.getpid()) _pid = str(os.getpid())
......
...@@ -26,12 +26,10 @@ A blob implements the IBlob interface: ...@@ -26,12 +26,10 @@ A blob implements the IBlob interface:
>>> IBlob.providedBy(myblob) >>> IBlob.providedBy(myblob)
True True
Opening a new Blob for reading fails: We can open a new blob file for reading, but it won't have any data:
>>> myblob.open("r") >>> myblob.open("r").read()
Traceback (most recent call last): ''
...
BlobError: Blob does not exist.
But we can write data to a new Blob by opening it for writing: But we can write data to a new Blob by opening it for writing:
......
...@@ -35,21 +35,53 @@ Putting a Blob into a Connection works like any other Persistent object:: ...@@ -35,21 +35,53 @@ Putting a Blob into a Connection works like any other Persistent object::
>>> blob1 = Blob() >>> blob1 = Blob()
>>> blob1.open('w').write('this is blob 1') >>> blob1.open('w').write('this is blob 1')
>>> root1['blob1'] = blob1 >>> root1['blob1'] = blob1
>>> transaction.commit() >>> 'blob1' in root1
True
Aborting a blob add leaves the blob unchanged:
Aborting a transaction involving a blob write cleans up uncommitted >>> transaction.abort()
file data:: >>> 'blob1' in root1
False
>>> dead_blob = Blob() >>> blob1._p_oid
>>> dead_blob.open('w').write('this is a dead blob') >>> blob1._p_jar
>>> root1['dead_blob'] = dead_blob >>> blob1.open().read()
>>> fname = dead_blob._p_blob_uncommitted 'this is blob 1'
It doesn't clear the file because there is no previously committed version:
>>> fname = blob1._p_blob_uncommitted
>>> import os >>> import os
>>> os.path.exists(fname) >>> os.path.exists(fname)
True True
Let's put the blob back into the root and commit the change:
>>> root1['blob1'] = blob1
>>> transaction.commit()
Now, if we make a change and abort it, we'll return to the committed
state:
>>> os.path.exists(fname)
False
>>> blob1._p_blob_uncommitted
>>> blob1.open('w').write('this is new blob 1')
>>> blob1.open().read()
'this is new blob 1'
>>> fname = blob1._p_blob_uncommitted
>>> os.path.exists(fname)
True
>>> transaction.abort() >>> transaction.abort()
>>> os.path.exists(fname) >>> os.path.exists(fname)
False False
>>> blob1._p_blob_uncommitted
>>> blob1.open().read()
'this is blob 1'
Opening a blob gives us a filehandle. Getting data out of the Opening a blob gives us a filehandle. Getting data out of the
resulting filehandle is accomplished via the filehandle's read method:: resulting filehandle is accomplished via the filehandle's read method::
...@@ -57,66 +89,44 @@ resulting filehandle is accomplished via the filehandle's read method:: ...@@ -57,66 +89,44 @@ resulting filehandle is accomplished via the filehandle's read method::
>>> connection2 = database.open() >>> connection2 = database.open()
>>> root2 = connection2.root() >>> root2 = connection2.root()
>>> blob1a = root2['blob1'] >>> blob1a = root2['blob1']
>>> blob1a._p_blob_refcounts()
(0, 0)
>>>
>>> blob1afh1 = blob1a.open("r") >>> blob1afh1 = blob1a.open("r")
>>> blob1afh1.read() >>> blob1afh1.read()
'this is blob 1' 'this is blob 1'
>>> # The filehandle keeps a reference to its blob object
>>> blob1afh1.blob._p_blob_refcounts()
(1, 0)
Let's make another filehandle for read only to blob1a, this should bump Let's make another filehandle for read only to blob1a. Aach file
up its refcount by one, and each file handle has a reference to the handle has a reference to the (same) underlying blob::
(same) underlying blob::
>>> blob1afh2 = blob1a.open("r") >>> blob1afh2 = blob1a.open("r")
>>> blob1afh2.blob._p_blob_refcounts()
(2, 0)
>>> blob1afh1.blob._p_blob_refcounts()
(2, 0)
>>> blob1afh2.blob is blob1afh1.blob >>> blob1afh2.blob is blob1afh1.blob
True True
Let's close the first filehandle we got from the blob, this should decrease Let's close the first filehandle we got from the blob::
its refcount by one::
>>> blob1afh1.close() >>> blob1afh1.close()
>>> blob1a._p_blob_refcounts()
(1, 0)
Let's abort this transaction, and ensure that the filehandles that we Let's abort this transaction, and ensure that the filehandles that we
opened are now closed and that the filehandle refcounts on the blob opened are still open::
object are cleared::
>>> transaction.abort() >>> transaction.abort()
>>> blob1afh1.blob._p_blob_refcounts()
(0, 0)
>>> blob1afh2.blob._p_blob_refcounts()
(0, 0)
>>> blob1a._p_blob_refcounts()
(0, 0)
>>> blob1afh2.read() >>> blob1afh2.read()
Traceback (most recent call last): 'this is blob 1'
...
ValueError: I/O operation on closed file
If we open a blob for append, its write refcount should be nonzero. >>> blob1afh2.close()
Additionally, writing any number of bytes to the blobfile should
result in the blob being marked "dirty" in the connection (we just If we open a blob for append, writing any number of bytes to the
aborted above, so the object should be "clean" when we start):: blobfile should result in the blob being marked "dirty" in the
connection (we just aborted above, so the object should be "clean"
when we start)::
>>> bool(blob1a._p_changed) >>> bool(blob1a._p_changed)
False False
>>> blob1a.open('r').read() >>> blob1a.open('r').read()
'this is blob 1' 'this is blob 1'
>>> blob1afh3 = blob1a.open('a') >>> blob1afh3 = blob1a.open('a')
>>> blob1afh3.write('woot!')
>>> blob1a._p_blob_refcounts()
(0, 1)
>>> bool(blob1a._p_changed) >>> bool(blob1a._p_changed)
True True
>>> blob1afh3.write('woot!')
We can open more than one blob object during the course of a single We can open more than one blob object during the course of a single
transaction:: transaction::
...@@ -125,10 +135,6 @@ transaction:: ...@@ -125,10 +135,6 @@ transaction::
>>> blob2.open('w').write('this is blob 3') >>> blob2.open('w').write('this is blob 3')
>>> root2['blob2'] = blob2 >>> root2['blob2'] = blob2
>>> transaction.commit() >>> transaction.commit()
>>> blob2._p_blob_refcounts()
(0, 0)
>>> blob1._p_blob_refcounts()
(0, 0)
Since we committed the current transaction above, the aggregate Since we committed the current transaction above, the aggregate
changes we've made to blob, blob1a (these refer to the same object) and changes we've made to blob, blob1a (these refer to the same object) and
...@@ -200,7 +206,7 @@ int on 64-bit):: ...@@ -200,7 +206,7 @@ int on 64-bit)::
Savepoints and Blobs Savepoints and Blobs
-------------------- --------------------
We do support optimistic savepoints :: We do support optimistic savepoints:
>>> connection5 = database.open() >>> connection5 = database.open()
>>> root5 = connection5.root() >>> root5 = connection5.root()
...@@ -222,17 +228,16 @@ We do support optimistic savepoints :: ...@@ -222,17 +228,16 @@ We do support optimistic savepoints ::
"I'm a happy blob. And I'm singing." "I'm a happy blob. And I'm singing."
>>> transaction.get().commit() >>> transaction.get().commit()
We do not support non-optimistic savepoints:: We support optimistic savepoints too:
>>> blob_fh = root5['blob'].open("a") >>> root5['blob'].open("a").write(" And I'm dancing.")
>>> blob_fh.write(" And the weather is beautiful.")
>>> blob_fh.close()
>>> root5['blob'].open("r").read() >>> root5['blob'].open("r").read()
"I'm a happy blob. And I'm singing. And the weather is beautiful." "I'm a happy blob. And I'm singing. And I'm dancing."
>>> savepoint = transaction.savepoint() # doctest: +ELLIPSIS >>> savepoint = transaction.savepoint()
Traceback (most recent call last): >>> root5['blob'].open("w").write(" And the weather is beautiful.")
... >>> savepoint.rollback()
TypeError: ('Savepoints unsupported', <ZODB.blob.BlobDataManager instance at 0x...>) >>> root5['blob'].open("r").read()
"I'm a happy blob. And I'm singing. And I'm dancing."
>>> transaction.abort() >>> transaction.abort()
Reading Blobs outside of a transaction Reading Blobs outside of a transaction
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment