Commit d499112e authored by Jim Fulton's avatar Jim Fulton

Reverted accidental checkin.

parent 2c678116
......@@ -16,14 +16,12 @@
import base64
import logging
import logging
import os
import shutil
import sys
import tempfile
import threading
import time
import weakref
import tempfile
import logging
import zope.interface
......@@ -44,114 +42,78 @@ BLOB_SUFFIX = ".blob"
valid_modes = 'r', 'w', 'r+', 'a'
# Threading issues:
# We want to support closing blob files when they are destroyed.
# This introduces a threading issue, since a blob file may be destroyed
# via GC in any thread.
class Blob(persistent.Persistent):
"""A BLOB supports efficient handling of large data within ZODB."""
zope.interface.implements(ZODB.interfaces.IBlob)
_os_link = os.rename
_p_blob_readers = 0
_p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
_p_blob_committed = None # Filename of the committed data
def __setstate__(self, state=None):
# We use lists here because it will allow is to add and remove
# atomically
self.readers = []
self.writers = []
__init__ = __setstate__
def __getstate__(self):
return None
def _p_deactivate(self):
# Only ghostify if we are unopened.
if self.readers or self.writers:
return
super(Blob, self)._p_deactivate()
def _p_invalidate(self):
# Force-close any open readers or writers,
# XXX should we warn of this? Maybe?
for ref in self.readers+self.writers:
f = ref()
if f is not None:
f.close()
super(Blob, self)._p_invalidate()
@property
def opened(self):
return bool(self.readers or self.writers)
def closed(self, f):
# We use try/except below because another thread might remove
# the ref after we check it if the file is GCed.
for file_refs in (self.readers, self.writers):
for ref in self.file_refs:
if ref() is f:
try:
file_refs.remove(ref)
except ValueError:
pass
return
_p_blob_data = None # Filename of the committed data
# All persistent object store a reference to their data manager, a database
# connection in the _p_jar attribute. So we are going to do the same with
# blobs here.
_p_blob_manager = None
# Blobs need to participate in transactions even when not connected to
# a database yet. If you want to use a non-default transaction manager,
# you can override it via _p_blob_transaction. This is currently
# required for unit testing.
_p_blob_transaction = None
def open(self, mode="r"):
"""Returns a file(-like) object representing blob data."""
result = None
if mode not in valid_modes:
raise ValueError("invalid mode", mode)
if self.writers:
raise BlobError("Already opened for writing.")
if mode == 'r':
if self._current_filename() is None:
self._create_uncommitted_file()
raise BlobError("Blob does not exist.")
if self._p_blob_writers != 0:
raise BlobError("Already opened for writing.")
self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self)
def destroyed(ref, readers=self.readers):
try:
readers.remove(ref)
except ValueError:
pass
self.readers.append(weakref.ref(result, destroyed))
else:
if self._p_blob_readers:
elif mode == 'w':
if self._p_blob_readers != 0:
raise BlobError("Already opened for reading.")
if mode == 'w':
if self._p_blob_uncommitted is None:
self._create_uncommitted_file()
result = BlobFile(self._p_blob_uncommitted, mode, self)
self._p_blob_writers += 1
if self._p_blob_uncommitted is None:
self._create_uncommitted_file()
result = BlobFile(self._p_blob_uncommitted, mode, self)
elif mode in ('a', 'r+'):
if self._p_blob_readers != 0:
raise BlobError("Already opened for reading.")
if self._p_blob_uncommitted is None:
# Create a new working copy
uncommitted = BlobFile(self._create_uncommitted_file(),
mode, self)
# NOTE: _p_blob data appears by virtue of Connection._setstate
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
else:
if self._p_blob_uncommitted is None:
# Create a new working copy
self._create_uncommitted_file()
result = BlobFile(self._p_blob_uncommitted, mode, self)
utils.cp(file(self._p_blob_committed), result)
if mode == 'r+':
result.seek(0)
else:
# Re-use existing working copy
result = BlobFile(self._p_blob_uncommitted, mode, self)
# Re-use existing working copy
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
def destroyed(ref, writers=self.writers):
try:
writers.remove(ref)
except ValueError:
pass
self.writers.append(weakref.ref(result, destroyed))
self._p_blob_writers += 1
result = uncommitted
self._p_changed = True
else:
raise IOError('invalid mode: %s ' % mode)
if result is not None:
self._setup_transaction_manager(result)
return result
def openDetached(self, class_=file):
......@@ -189,7 +151,7 @@ class Blob(persistent.Persistent):
os.unlink(target)
try:
os.rename(filename, target)
self._os_link(filename, target)
except:
# Recover from the failed consumption: First remove the file, it
# might exist and mark the pointer to the uncommitted file.
......@@ -213,14 +175,14 @@ class Blob(persistent.Persistent):
# We changed the blob state and have to make sure we join the
# transaction.
self._p_changed = True
self._change()
# utility methods
def _current_filename(self):
# NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
# NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
# Connection._setstate
return self._p_blob_uncommitted or self._p_blob_committed
return self._p_blob_uncommitted or self._p_blob_data
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, (
......@@ -229,6 +191,148 @@ class Blob(persistent.Persistent):
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted
def _change(self):
self._p_changed = 1
def _setup_transaction_manager(self, result):
# We join the transaction with our own data manager in order to be
# notified of commit/vote/abort events. We do this because at
# transaction boundaries, we need to fix up _p_ reference counts
# that keep track of open readers and writers and close any
# writable filehandles we've opened.
if self._p_blob_manager is None:
# Blobs need to always participate in transactions.
if self._p_jar is not None:
# If we are connected to a database, then we use the
# transaction manager that belongs to this connection
tm = self._p_jar.transaction_manager
else:
# If we are not connected to a database, we check whether
# we have been given an explicit transaction manager
if self._p_blob_transaction:
tm = self._p_blob_transaction
else:
# Otherwise we use the default
# transaction manager as an educated guess.
tm = transaction.manager
# Create our datamanager and join he current transaction.
dm = BlobDataManager(self, result, tm)
tm.get().join(dm)
elif result:
# Each blob data manager should manage only the one blob
# assigned to it. Assert that this is the case and it is the
# correct blob
assert self._p_blob_manager.blob is self
self._p_blob_manager.register_fh(result)
# utility methods which should not cause the object's state to be
# loaded if they are called while the object is a ghost. Thus,
# they are named with the _p_ convention and only operate against
# other _p_ instance attributes. We conventionally name these methods
# and attributes with a _p_blob prefix.
def _p_blob_clear(self):
self._p_blob_readers = 0
self._p_blob_writers = 0
def _p_blob_decref(self, mode):
if mode == 'r':
self._p_blob_readers = max(0, self._p_blob_readers - 1)
else:
assert mode in valid_modes, "Invalid mode %r" % mode
self._p_blob_writers = max(0, self._p_blob_writers - 1)
def _p_blob_refcounts(self):
# used by unit tests
return self._p_blob_readers, self._p_blob_writers
class BlobDataManager:
"""Special data manager to handle transaction boundaries for blobs.
Blobs need some special care-taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_reader and _p_writer
refcount attributes must be set to a consistent state
b) the file objects might get passed out of the thread/transaction
and must deny any relationship to the original blob.
c) writable blob filehandles must be closed at the end of a txn so
as to not allow reuse between two transactions.
"""
zope.interface.implements(transaction.interfaces.IDataManager)
def __init__(self, blob, filehandle, tm):
self.blob = blob
self.transaction = tm.get()
# we keep a weakref to the file handle because we don't want to
# keep it alive if all other references to it die (e.g. in the
# case it's opened without assigning it to a name).
self.fhrefs = utils.WeakSet()
self.register_fh(filehandle)
self.sortkey = time.time()
self.prepared = False
# Blob specific methods
def register_fh(self, filehandle):
self.fhrefs.add(filehandle)
def _remove_uncommitted_data(self):
self.blob._p_blob_clear()
self.fhrefs.map(lambda fhref: fhref.close())
if (self.blob._p_blob_uncommitted is not None and
os.path.exists(self.blob._p_blob_uncommitted)):
os.unlink(self.blob._p_blob_uncommitted)
self.blob._p_blob_uncommitted = None
# IDataManager
def tpc_begin(self, transaction):
if self.prepared:
raise TypeError('Already prepared')
self._checkTransaction(transaction)
self.prepared = True
self.transaction = transaction
self.fhrefs.map(lambda fhref: fhref.close())
def commit(self, transaction):
if not self.prepared:
raise TypeError('Not prepared to commit')
self._checkTransaction(transaction)
self.transaction = None
self.prepared = False
self.blob._p_blob_clear()
def abort(self, transaction):
self.tpc_abort(transaction)
def tpc_abort(self, transaction):
self._checkTransaction(transaction)
if self.transaction is not None:
self.transaction = None
self.prepared = False
self._remove_uncommitted_data()
def tpc_finish(self, transaction):
pass
def tpc_vote(self, transaction):
pass
def sortKey(self):
return self.sortkey
def _checkTransaction(self, transaction):
if (self.transaction is not None and
self.transaction is not transaction):
raise TypeError("Transaction missmatch",
transaction, self.transaction)
class BlobFile(file):
"""A BlobFile that holds a file handle to actual blob data.
......@@ -245,10 +349,35 @@ class BlobFile(file):
def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode+'b')
self.blob = blob
self.close_called = False
def write(self, data):
super(BlobFile, self).write(data)
self.blob._change()
def writelines(self, lines):
super(BlobFile, self).writelines(lines)
self.blob._change()
def truncate(self, size=0):
super(BlobFile, self).truncate(size)
self.blob._change()
def close(self):
self.blob.closed(self)
file.close(self)
# we don't want to decref twice
if not self.close_called:
self.blob._p_blob_decref(self.mode[:-1])
self.close_called = True
super(BlobFile, self).close()
def __del__(self):
# XXX we need to ensure that the file is closed at object
# expiration or our blob's refcount won't be decremented.
# This probably needs some work; I don't know if the names
# 'BlobFile' or 'super' will be available at program exit, but
# we'll assume they will be for now in the name of not
# muddying the code needlessly.
self.close()
_pid = str(os.getpid())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment