Commit f64c571f authored by Jim Fulton's avatar Jim Fulton

Reverted accidental checkin.

parent 63b3f731
...@@ -16,14 +16,12 @@ ...@@ -16,14 +16,12 @@
import base64 import base64
import logging import logging
import logging
import os import os
import shutil import shutil
import sys import sys
import tempfile
import threading
import time import time
import weakref import tempfile
import logging
import zope.interface import zope.interface
...@@ -44,114 +42,78 @@ BLOB_SUFFIX = ".blob" ...@@ -44,114 +42,78 @@ BLOB_SUFFIX = ".blob"
valid_modes = 'r', 'w', 'r+', 'a' valid_modes = 'r', 'w', 'r+', 'a'
# Threading issues:
# We want to support closing blob files when they are destroyed.
# This introduces a threading issue, since a blob file may be destroyed
# via GC in any thread.
class Blob(persistent.Persistent): class Blob(persistent.Persistent):
"""A BLOB supports efficient handling of large data within ZODB.""" """A BLOB supports efficient handling of large data within ZODB."""
zope.interface.implements(ZODB.interfaces.IBlob) zope.interface.implements(ZODB.interfaces.IBlob)
_os_link = os.rename
_p_blob_readers = 0
_p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data _p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
_p_blob_committed = None # Filename of the committed data _p_blob_data = None # Filename of the committed data
def __setstate__(self, state=None): # All persistent object store a reference to their data manager, a database
# We use lists here because it will allow is to add and remove # connection in the _p_jar attribute. So we are going to do the same with
# atomically # blobs here.
self.readers = [] _p_blob_manager = None
self.writers = []
# Blobs need to participate in transactions even when not connected to
__init__ = __setstate__ # a database yet. If you want to use a non-default transaction manager,
# you can override it via _p_blob_transaction. This is currently
def __getstate__(self): # required for unit testing.
return None _p_blob_transaction = None
def _p_deactivate(self):
# Only ghostify if we are unopened.
if self.readers or self.writers:
return
super(Blob, self)._p_deactivate()
def _p_invalidate(self):
# Force-close any open readers or writers,
# XXX should we warn of this? Maybe?
for ref in self.readers+self.writers:
f = ref()
if f is not None:
f.close()
super(Blob, self)._p_invalidate()
@property
def opened(self):
return bool(self.readers or self.writers)
def closed(self, f):
# We use try/except below because another thread might remove
# the ref after we check it if the file is GCed.
for file_refs in (self.readers, self.writers):
for ref in self.file_refs:
if ref() is f:
try:
file_refs.remove(ref)
except ValueError:
pass
return
def open(self, mode="r"): def open(self, mode="r"):
"""Returns a file(-like) object representing blob data."""
result = None
if mode not in valid_modes: if mode not in valid_modes:
raise ValueError("invalid mode", mode) raise ValueError("invalid mode", mode)
if self.writers:
raise BlobError("Already opened for writing.")
if mode == 'r': if mode == 'r':
if self._current_filename() is None: if self._current_filename() is None:
self._create_uncommitted_file() raise BlobError("Blob does not exist.")
if self._p_blob_writers != 0:
raise BlobError("Already opened for writing.")
self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self) result = BlobFile(self._current_filename(), mode, self)
def destroyed(ref, readers=self.readers): elif mode == 'w':
try: if self._p_blob_readers != 0:
readers.remove(ref)
except ValueError:
pass
self.readers.append(weakref.ref(result, destroyed))
else:
if self._p_blob_readers:
raise BlobError("Already opened for reading.") raise BlobError("Already opened for reading.")
if mode == 'w': self._p_blob_writers += 1
if self._p_blob_uncommitted is None: if self._p_blob_uncommitted is None:
self._create_uncommitted_file() self._create_uncommitted_file()
result = BlobFile(self._p_blob_uncommitted, mode, self) result = BlobFile(self._p_blob_uncommitted, mode, self)
elif mode in ('a', 'r+'):
if self._p_blob_readers != 0:
raise BlobError("Already opened for reading.")
if self._p_blob_uncommitted is None:
# Create a new working copy
uncommitted = BlobFile(self._create_uncommitted_file(),
mode, self)
# NOTE: _p_blob data appears by virtue of Connection._setstate
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
else: else:
if self._p_blob_uncommitted is None: # Re-use existing working copy
# Create a new working copy uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
self._create_uncommitted_file()
result = BlobFile(self._p_blob_uncommitted, mode, self)
utils.cp(file(self._p_blob_committed), result)
if mode == 'r+':
result.seek(0)
else:
# Re-use existing working copy
result = BlobFile(self._p_blob_uncommitted, mode, self)
def destroyed(ref, writers=self.writers): self._p_blob_writers += 1
try: result = uncommitted
writers.remove(ref)
except ValueError:
pass
self.writers.append(weakref.ref(result, destroyed))
self._p_changed = True else:
raise IOError('invalid mode: %s ' % mode)
if result is not None:
self._setup_transaction_manager(result)
return result return result
def openDetached(self, class_=file): def openDetached(self, class_=file):
...@@ -189,7 +151,7 @@ class Blob(persistent.Persistent): ...@@ -189,7 +151,7 @@ class Blob(persistent.Persistent):
os.unlink(target) os.unlink(target)
try: try:
os.rename(filename, target) self._os_link(filename, target)
except: except:
# Recover from the failed consumption: First remove the file, it # Recover from the failed consumption: First remove the file, it
# might exist and mark the pointer to the uncommitted file. # might exist and mark the pointer to the uncommitted file.
...@@ -213,14 +175,14 @@ class Blob(persistent.Persistent): ...@@ -213,14 +175,14 @@ class Blob(persistent.Persistent):
# We changed the blob state and have to make sure we join the # We changed the blob state and have to make sure we join the
# transaction. # transaction.
self._p_changed = True self._change()
# utility methods # utility methods
def _current_filename(self): def _current_filename(self):
# NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
# Connection._setstate # Connection._setstate
return self._p_blob_uncommitted or self._p_blob_committed return self._p_blob_uncommitted or self._p_blob_data
def _create_uncommitted_file(self): def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, ( assert self._p_blob_uncommitted is None, (
...@@ -229,6 +191,148 @@ class Blob(persistent.Persistent): ...@@ -229,6 +191,148 @@ class Blob(persistent.Persistent):
self._p_blob_uncommitted = utils.mktemp(dir=tempdir) self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted return self._p_blob_uncommitted
def _change(self):
self._p_changed = 1
def _setup_transaction_manager(self, result):
# We join the transaction with our own data manager in order to be
# notified of commit/vote/abort events. We do this because at
# transaction boundaries, we need to fix up _p_ reference counts
# that keep track of open readers and writers and close any
# writable filehandles we've opened.
if self._p_blob_manager is None:
# Blobs need to always participate in transactions.
if self._p_jar is not None:
# If we are connected to a database, then we use the
# transaction manager that belongs to this connection
tm = self._p_jar.transaction_manager
else:
# If we are not connected to a database, we check whether
# we have been given an explicit transaction manager
if self._p_blob_transaction:
tm = self._p_blob_transaction
else:
# Otherwise we use the default
# transaction manager as an educated guess.
tm = transaction.manager
# Create our datamanager and join he current transaction.
dm = BlobDataManager(self, result, tm)
tm.get().join(dm)
elif result:
# Each blob data manager should manage only the one blob
# assigned to it. Assert that this is the case and it is the
# correct blob
assert self._p_blob_manager.blob is self
self._p_blob_manager.register_fh(result)
# utility methods which should not cause the object's state to be
# loaded if they are called while the object is a ghost. Thus,
# they are named with the _p_ convention and only operate against
# other _p_ instance attributes. We conventionally name these methods
# and attributes with a _p_blob prefix.
def _p_blob_clear(self):
self._p_blob_readers = 0
self._p_blob_writers = 0
def _p_blob_decref(self, mode):
if mode == 'r':
self._p_blob_readers = max(0, self._p_blob_readers - 1)
else:
assert mode in valid_modes, "Invalid mode %r" % mode
self._p_blob_writers = max(0, self._p_blob_writers - 1)
def _p_blob_refcounts(self):
# used by unit tests
return self._p_blob_readers, self._p_blob_writers
class BlobDataManager:
"""Special data manager to handle transaction boundaries for blobs.
Blobs need some special care-taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_reader and _p_writer
refcount attributes must be set to a consistent state
b) the file objects might get passed out of the thread/transaction
and must deny any relationship to the original blob.
c) writable blob filehandles must be closed at the end of a txn so
as to not allow reuse between two transactions.
"""
zope.interface.implements(transaction.interfaces.IDataManager)
def __init__(self, blob, filehandle, tm):
self.blob = blob
self.transaction = tm.get()
# we keep a weakref to the file handle because we don't want to
# keep it alive if all other references to it die (e.g. in the
# case it's opened without assigning it to a name).
self.fhrefs = utils.WeakSet()
self.register_fh(filehandle)
self.sortkey = time.time()
self.prepared = False
# Blob specific methods
def register_fh(self, filehandle):
self.fhrefs.add(filehandle)
def _remove_uncommitted_data(self):
self.blob._p_blob_clear()
self.fhrefs.map(lambda fhref: fhref.close())
if (self.blob._p_blob_uncommitted is not None and
os.path.exists(self.blob._p_blob_uncommitted)):
os.unlink(self.blob._p_blob_uncommitted)
self.blob._p_blob_uncommitted = None
# IDataManager
def tpc_begin(self, transaction):
if self.prepared:
raise TypeError('Already prepared')
self._checkTransaction(transaction)
self.prepared = True
self.transaction = transaction
self.fhrefs.map(lambda fhref: fhref.close())
def commit(self, transaction):
if not self.prepared:
raise TypeError('Not prepared to commit')
self._checkTransaction(transaction)
self.transaction = None
self.prepared = False
self.blob._p_blob_clear()
def abort(self, transaction):
self.tpc_abort(transaction)
def tpc_abort(self, transaction):
self._checkTransaction(transaction)
if self.transaction is not None:
self.transaction = None
self.prepared = False
self._remove_uncommitted_data()
def tpc_finish(self, transaction):
pass
def tpc_vote(self, transaction):
pass
def sortKey(self):
return self.sortkey
def _checkTransaction(self, transaction):
if (self.transaction is not None and
self.transaction is not transaction):
raise TypeError("Transaction missmatch",
transaction, self.transaction)
class BlobFile(file): class BlobFile(file):
"""A BlobFile that holds a file handle to actual blob data. """A BlobFile that holds a file handle to actual blob data.
...@@ -245,10 +349,35 @@ class BlobFile(file): ...@@ -245,10 +349,35 @@ class BlobFile(file):
def __init__(self, name, mode, blob): def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode+'b') super(BlobFile, self).__init__(name, mode+'b')
self.blob = blob self.blob = blob
self.close_called = False
def write(self, data):
super(BlobFile, self).write(data)
self.blob._change()
def writelines(self, lines):
super(BlobFile, self).writelines(lines)
self.blob._change()
def truncate(self, size=0):
super(BlobFile, self).truncate(size)
self.blob._change()
def close(self): def close(self):
self.blob.closed(self) # we don't want to decref twice
file.close(self) if not self.close_called:
self.blob._p_blob_decref(self.mode[:-1])
self.close_called = True
super(BlobFile, self).close()
def __del__(self):
# XXX we need to ensure that the file is closed at object
# expiration or our blob's refcount won't be decremented.
# This probably needs some work; I don't know if the names
# 'BlobFile' or 'super' will be available at program exit, but
# we'll assume they will be for now in the name of not
# muddying the code needlessly.
self.close()
_pid = str(os.getpid()) _pid = str(os.getpid())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment