Commit 523473ef authored by Chris McDonough's avatar Chris McDonough

Refactor some behavior of blob and blobfile (transparent), account for append...

Refactor some behavior of blob and blobfile (transparent), account for append mode not failing if a file doesn't exist (it acts like write mode in this situation).
parent 7e28ad1b
......@@ -12,11 +12,6 @@ import transaction
from transaction.interfaces import IDataManager
from persistent import Persistent
try:
from ZPublisher.Iterators import IStreamIterator
except ImportError:
IStreamIterator = None
class Blob(Persistent):
implements(IBlob)
......@@ -51,9 +46,6 @@ class Blob(Persistent):
result = BlobFile(self._p_blob_uncommitted, mode, self)
if mode.startswith("a"):
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
......@@ -66,28 +58,60 @@ class Blob(Persistent):
else:
# Re-use existing working copy
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
self._p_blob_writers +=1
result = uncommitted
if result is not None:
# we register ourselves as a data manager with the
# transaction machinery in order to be notified of
# commit/vote/abort events. We do this because at
# transaction boundaries, we need to fix up _p_ reference
# counts that keep track of open readers and writers and
# close any writable filehandles we've opened.
dm = BlobDataManager(self, result)
transaction.get().register(dm)
return result
return result
# utility methods
def _current_filename(self):
return self._p_blob_uncommitted or self._p_blob_data
def _change(self):
self._p_changed = 1
def _rc_clear(self):
self._p_blob_readers = 0
self._p_blob_writers = 0
def _rc_decref(self, mode):
if mode.startswith('r') or mode == 'U':
self._p_blob_readers = max(0, self._p_blob_readers - 1)
elif mode.startswith('w') or mode.startswith('a'):
self._p_blob_writers = max(0, self._p_blob_writers - 1)
else:
raise AssertionError, 'Unknown mode %s' % mode
def _get_refcounts(self):
# used by unit tests
return self._p_blob_readers, self._p_blob_writers
class BlobDataManager:
"""Special data manager to handle transaction boundaries for blobs.
Blobs need some special care taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_ attributes must be
set to a consistent state
Blobs need some special care taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_ reader and writer
refcount attributes must be set to a consistent state
b) the file objects might get passed out of the thread/transaction
and must deny any relationship to the original blob.
c) writable blob filehandles must be closed at the end of a txn so
as to not allow reuse between two transactions.
"""
implements(IDataManager)
......@@ -95,13 +119,8 @@ class BlobDataManager:
def __init__(self, blob, filehandle):
self.blob = blob
self.filehandle = filehandle
self.isSub = False
self._sortkey = time.time()
def _cleanUpBlob(self):
self.blob._p_blob_readers = 0
self.blob._p_blob_writers = 0
self.filehandle.cleanTransaction()
self.subtransaction = False
self.sortkey = time.time()
def abort_sub(self, transaction):
pass
......@@ -110,26 +129,29 @@ class BlobDataManager:
pass
def tpc_begin(self, transaction, subtransaction=False):
self.isSub = subtransaction
self.subtransaction = subtransaction
def tpc_abort(self, transaction):
self._cleanUpBlob()
pass
def tpc_finish(self, transaction):
self.isSub = False
self.subtransaction = False
def tpc_vote(self, transaction):
if not self.isSub:
self._cleanUpBlob()
pass
def commit(self, object, transaction):
pass
if not self.subtransaction:
self.blob._rc_clear() # clear all blob refcounts
self.filehandle.close()
def abort(self, object, transaction):
self._cleanUpBlob()
if not self.subtransaction:
self.blob._rc_clear()
self.filehandle.close()
def sortKey(self):
return self._sortkey
return self.sortkey
def beforeCompletion(self, transaction):
pass
......@@ -138,59 +160,38 @@ class BlobDataManager:
pass
class BlobFile(file):
""" A BlobFile is a file that can be used within a transaction boundary """
# XXX those files should be created in the same partition as
# the storage later puts them to avoid copying them ...
if IStreamIterator is not None:
__implements__ = (IStreamIterator,)
def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode)
self.blob = blob
self.streamsize = 1<<16
def _p_changed(self):
if self.blob is not None:
self.blob._p_changed = 1
def write(self, data):
super(BlobFile, self).write(data)
self._p_changed()
self.blob._change()
def writelines(self, lines):
super(BlobFile, self).writelines(lines)
self._p_changed()
self.blob._change()
def truncate(self, size):
def truncate(self, size=0):
super(BlobFile, self).truncate(size)
self._p_changed()
self.blob._change()
def close(self):
if self.blob is not None:
if (self.mode.startswith("w") or
self.mode.startswith("a")):
self.blob._p_blob_writers -= 1
else:
self.blob._p_blob_readers -= 1
self.blob._rc_decref(self.mode)
super(BlobFile, self).close()
def cleanTransaction(self):
self.blob = None
def next(self):
data = self.read(self.streamsize)
if not data:
if self.blob is not None:
self.blob._p_blob_readers -= 1
self.blob._rc_decref(self.mode)
raise StopIteration
return data
def __len__(self):
cur_pos = self.tell()
self.seek(0, 2)
size = self.tell()
self.seek(cur_pos, 0)
return size
......@@ -15,8 +15,7 @@
Connection support for Blobs tests
==================================
Connections handle Blobs specially. To demonstrate that, we first
need a Blob with some data:
Connections handle Blobs specially. To demonstrate that, we first need a Blob with some data:
>>> from ZODB.Blobs.interfaces import IBlob
>>> from ZODB.Blobs.Blob import Blob
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment