Commit 04fddda1 authored by Christian Theune's avatar Christian Theune

- Fixed deprecation warning for connection.txt

 - added clean(er?) handling on cleaning up blobs on transaction boundaries and
   beeing more careful not to cause harmful states when leaking BlobFiles out
   of threads/transactions (e.g. to Medusa)
parent 67193c35
...@@ -235,7 +235,7 @@ class ZEOServer: ...@@ -235,7 +235,7 @@ class ZEOServer:
storage.close() storage.close()
except: # Keep going except: # Keep going
log("failed to close storage %r" % name, log("failed to close storage %r" % name,
level=logging.EXCEPTION, exc_info=True) level=logging.exception, exc_info=True)
# Signal names # Signal names
......
import os import os
import time
import tempfile import tempfile
from zope.interface import implements from zope.interface import implements
...@@ -7,6 +8,8 @@ from zope.interface import implements ...@@ -7,6 +8,8 @@ from zope.interface import implements
from ZODB.Blobs.interfaces import IBlob from ZODB.Blobs.interfaces import IBlob
from ZODB.Blobs.exceptions import BlobError from ZODB.Blobs.exceptions import BlobError
from ZODB import utils from ZODB import utils
import transaction
from transaction.interfaces import IDataManager
from persistent import Persistent from persistent import Persistent
try: try:
...@@ -25,6 +28,8 @@ class Blob(Persistent): ...@@ -25,6 +28,8 @@ class Blob(Persistent):
def open(self, mode): def open(self, mode):
"""Returns a file(-like) object for handling the blob data.""" """Returns a file(-like) object for handling the blob data."""
result = None
if mode == "r": if mode == "r":
if self._current_filename() is None: if self._current_filename() is None:
raise BlobError, "Blob does not exist." raise BlobError, "Blob does not exist."
...@@ -33,7 +38,7 @@ class Blob(Persistent): ...@@ -33,7 +38,7 @@ class Blob(Persistent):
raise BlobError, "Already opened for writing." raise BlobError, "Already opened for writing."
self._p_blob_readers += 1 self._p_blob_readers += 1
return BlobFile(self._current_filename(), "rb", self) result = BlobFile(self._current_filename(), "rb", self)
if mode == "w": if mode == "w":
if self._p_blob_readers != 0: if self._p_blob_readers != 0:
...@@ -43,7 +48,7 @@ class Blob(Persistent): ...@@ -43,7 +48,7 @@ class Blob(Persistent):
self._p_blob_uncommitted = utils.mktemp() self._p_blob_uncommitted = utils.mktemp()
self._p_blob_writers += 1 self._p_blob_writers += 1
return BlobFile(self._p_blob_uncommitted, "wb", self) result = BlobFile(self._p_blob_uncommitted, "wb", self)
if mode =="a": if mode =="a":
if self._current_filename() is None: if self._current_filename() is None:
...@@ -63,13 +68,75 @@ class Blob(Persistent): ...@@ -63,13 +68,75 @@ class Blob(Persistent):
uncommitted = BlobFile(self._p_blob_uncommitted, "ab", self) uncommitted = BlobFile(self._p_blob_uncommitted, "ab", self)
self._p_blob_writers +=1 self._p_blob_writers +=1
return uncommitted result = uncommitted
if result is not None:
dm = BlobDataManager(self, result)
transaction.get().register(dm)
return result
# utility methods # utility methods
def _current_filename(self): def _current_filename(self):
return self._p_blob_uncommitted or self._p_blob_data return self._p_blob_uncommitted or self._p_blob_data
class BlobDataManager:
"""Special data manager to handle transaction boundaries for blobs.
Blobs need some special care taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_ attributes must be
set to a consistent state
b) the file objects might get passed out of the thread/transaction
and must deny any relationship to the original blob.
"""
implements(IDataManager)
def __init__(self, blob, filehandle):
self.blob = blob
self.filehandle = filehandle
self.isSub = False
self._sortkey = time.time()
def _cleanUpBlob(self):
self.blob._p_blob_readers = 0
self.blob._p_blob_writers = 0
self.filehandle.cleanTransaction()
def abort_sub(self, transaction):
pass
def commit_sub(self, transaction):
pass
def tpc_begin(self, transaction, subtransaction=False):
self.isSub = subtransaction
def tpc_abort(self, transaction):
self._cleanUpBlob()
def tpc_finish(self, transaction):
self.isSub = False
def tpc_vote(self, transaction):
if not self.isSub:
self._cleanUpBlob()
def commit(self, object, transaction):
pass
def abort(self, object, transaction):
self._cleanUpBlob()
def sortKey(self):
return self._sortkey
def beforeCompletion(self, transaction):
pass
def afterCompletion(self, transaction):
pass
class BlobFile(file): class BlobFile(file):
# XXX those files should be created in the same partition as # XXX those files should be created in the same partition as
...@@ -83,19 +150,24 @@ class BlobFile(file): ...@@ -83,19 +150,24 @@ class BlobFile(file):
self.blob = blob self.blob = blob
self.streamsize = 1<<16 self.streamsize = 1<<16
def _p_changed(self):
if self.blob is not None:
self.blob._p_changed = 1
def write(self, data): def write(self, data):
super(BlobFile, self).write(data) super(BlobFile, self).write(data)
self.blob._p_changed = 1 self._p_changed()
def writelines(self, lines): def writelines(self, lines):
super(BlobFile, self).writelines(lines) super(BlobFile, self).writelines(lines)
self.blob._p_changed = 1 self._p_changed()
def truncate(self, size): def truncate(self, size):
super(BlobFile, self).truncate(size) super(BlobFile, self).truncate(size)
self.blob._p_changed = 1 self._p_changed()
def close(self): def close(self):
if self.blob is not None:
if (self.mode.startswith("w") or if (self.mode.startswith("w") or
self.mode.startswith("a")): self.mode.startswith("a")):
self.blob._p_blob_writers -= 1 self.blob._p_blob_writers -= 1
...@@ -103,9 +175,13 @@ class BlobFile(file): ...@@ -103,9 +175,13 @@ class BlobFile(file):
self.blob._p_blob_readers -= 1 self.blob._p_blob_readers -= 1
super(BlobFile, self).close() super(BlobFile, self).close()
def cleanTransaction(self):
self.blob = None
def next(self): def next(self):
data = self.read(self.streamsize) data = self.read(self.streamsize)
if not data: if not data:
if self.blob is not None:
self.blob._p_blob_readers -= 1 self.blob._p_blob_readers -= 1
raise StopIteration raise StopIteration
return data return data
......
...@@ -49,7 +49,7 @@ Getting stuff out of there works similar: ...@@ -49,7 +49,7 @@ Getting stuff out of there works similar:
>>> connection2 = database.open() >>> connection2 = database.open()
>>> root = connection2.root() >>> root = connection2.root()
>>> blob2 = root['myblob'] >>> blob2 = root['myblob']
>>> IBlob.isImplementedBy(blob2) >>> IBlob.providedBy(blob2)
True True
>>> blob2.open("r").read() >>> blob2.open("r").read()
"I'm a happy Blob." "I'm a happy Blob."
......
...@@ -164,15 +164,18 @@ ...@@ -164,15 +164,18 @@
<key name="version-cache-size" datatype="integer" default="100"/> <key name="version-cache-size" datatype="integer" default="100"/>
</sectiontype> </sectiontype>
<sectiontype name="blobfilestorage" datatype=".BlobFileStorage" <sectiontype name="blobstorage" datatype=".BlobStorage"
implements="ZODB.storage" extends="filestorage"> implements="ZODB.storage">
<key name="blob-dir" required="yes"> <key name="blob-dir" required="yes">
<description> <description>
Path name to the blob storage directory. Path name to the blob storage directory.
</description> </description>
</key> </key>
<section type="ZODB.storage" name="*" attribute="base"/>
</sectiontype> </sectiontype>
</component> </component>
...@@ -132,13 +132,12 @@ class FileStorage(BaseConfig): ...@@ -132,13 +132,12 @@ class FileStorage(BaseConfig):
read_only=self.config.read_only, read_only=self.config.read_only,
quota=self.config.quota) quota=self.config.quota)
class BlobFileStorage(FileStorage): class BlobStorage(BaseConfig):
def open(self): def open(self):
from ZODB.Blobs.BlobStorage import BlobStorage from ZODB.Blobs.BlobStorage import BlobStorage
base_storage = FileStorage.open(self) base = self.config.base.open()
return BlobStorage(self.config.blob_dir, base_storage) return BlobStorage(self.config.blob_dir, base)
class ZEOClient(BaseConfig): class ZEOClient(BaseConfig):
......
...@@ -124,7 +124,7 @@ class IDataManager(zope.interface.Interface): ...@@ -124,7 +124,7 @@ class IDataManager(zope.interface.Interface):
transaction being committed. transaction being committed.
""" """
def commit(transaction): def commit(object, transaction):
"""Commit modifications to registered objects. """Commit modifications to registered objects.
Save the object as part of the data to be made persistent if Save the object as part of the data to be made persistent if
...@@ -134,7 +134,7 @@ class IDataManager(zope.interface.Interface): ...@@ -134,7 +134,7 @@ class IDataManager(zope.interface.Interface):
errors occur it saves the objects in the storage. errors occur it saves the objects in the storage.
""" """
def abort(transaction): def abort(object, transaction):
"""Abort a transaction and forget all changes. """Abort a transaction and forget all changes.
Abort must be called outside of a two-phase commit. Abort must be called outside of a two-phase commit.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment