Commit abc35238 authored by Chris McDonough's avatar Chris McDonough

First crack at merging ctheune-blobsupport into a recent trunk checkout (tests...

First crack at merging ctheune-blobsupport into a recent trunk checkout (tests have not yet been run).
parent 5780e312
...@@ -27,6 +27,7 @@ import time ...@@ -27,6 +27,7 @@ import time
import types import types
import logging import logging
from zope.interface import implements
from ZEO import ServerStub from ZEO import ServerStub
from ZEO.cache import ClientCache from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer from ZEO.TransactionBuffer import TransactionBuffer
...@@ -34,8 +35,11 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError ...@@ -34,8 +35,11 @@ from ZEO.Exceptions import ClientStorageError, ClientDisconnected, AuthError
from ZEO.auth import get_module from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager from ZEO.zrpc.client import ConnectionManager
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX, BLOB_DIRTY
from ZODB import POSException from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER from ZODB.loglevels import BLATHER
from ZODB.Blobs.interfaces import IBlobStorage
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage') logger = logging.getLogger('ZEO.ClientStorage')
...@@ -93,6 +97,7 @@ class ClientStorage(object): ...@@ -93,6 +97,7 @@ class ClientStorage(object):
tpc_begin(). tpc_begin().
""" """
implements(IBlobStorage)
# Classes we instantiate. A subclass might override. # Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer TransactionBufferClass = TransactionBuffer
...@@ -106,7 +111,8 @@ class ClientStorage(object): ...@@ -106,7 +111,8 @@ class ClientStorage(object):
wait_for_server_on_startup=None, # deprecated alias for wait wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None, wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0, read_only=0, read_only_fallback=0,
username='', password='', realm=None): username='', password='', realm=None,
blob_dir=tempfile.gettempdir()):
"""ClientStorage constructor. """ClientStorage constructor.
This is typically invoked from a custom_zodb.py file. This is typically invoked from a custom_zodb.py file.
...@@ -177,6 +183,11 @@ class ClientStorage(object): ...@@ -177,6 +183,11 @@ class ClientStorage(object):
password -- string with plaintext password to be used password -- string with plaintext password to be used
when authenticated. when authenticated.
realm -- not documented.
blob_dir -- directory path for blob data. 'blob data' is data that
is retrieved via the loadBlob API.
Note that the authentication protocol is defined by the server Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details). testConnection() and doAuth() for details).
...@@ -303,6 +314,8 @@ class ClientStorage(object): ...@@ -303,6 +314,8 @@ class ClientStorage(object):
# is executing. # is executing.
self._lock = threading.Lock() self._lock = threading.Lock()
self.blob_dir = blob_dir
# Decide whether to use non-temporary files # Decide whether to use non-temporary files
if client is not None: if client is not None:
dir = var or os.getcwd() dir = var or os.getcwd()
...@@ -885,6 +898,60 @@ class ClientStorage(object): ...@@ -885,6 +898,60 @@ class ClientStorage(object):
self._tbuf.store(oid, version, data) self._tbuf.store(oid, version, data)
return self._check_serials() return self._check_serials()
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
serials = self.store(oid, serial, data, version, txn)
blobfile = open(blobfilename, "rb")
while True:
chunk = blobfile.read(4096)
# even if the blobfile is completely empty, we need to call
# storeBlob at least once in order to be able to call
# storeBlobEnd successfully.
self._server.storeBlob(oid, serial, chunk, version, id(txn))
if not chunk:
self._server.storeBlobEnd(oid, serial, data, version, id(txn))
break
os.unlink(blobfilename)
return serials
def _getDirtyFilename(self, oid, serial):
"""Generate an intermediate filename for two-phase commit.
"""
return self._getCleanFilename(oid, serial) + "." + BLOB_DIRTY
def _getCleanFilename(self, oid, tid):
return os.path.join(self.blob_dir,
"%s-%s%s" % (utils.oid_repr(oid),
utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def loadBlob(self, oid, serial, version):
blob_filename = self._getCleanFilename(oid, serial)
if os.path.exists(blob_filename): # XXX see race condition below
return blob_filename
self._load_lock.acquire()
try:
if self._server is None:
raise ClientDisconnected()
tempfilename = self._getDirtyFilename(oid, serial)
tempfile = open(tempfilename, "wb")
offset = 0
while True:
chunk = self._server.loadBlob(oid, serial, version, offset)
if not chunk:
break
offset += len(chunk)
tempfile.write(chunk)
tempfile.close()
utils.best_rename(tempfilename, blob_filename)
return blob_filename
finally:
self._load_lock.release()
def tpc_vote(self, txn): def tpc_vote(self, txn):
"""Storage API: vote on a transaction.""" """Storage API: vote on a transaction."""
if txn is not self._transaction: if txn is not self._transaction:
......
...@@ -216,6 +216,12 @@ class StorageServer: ...@@ -216,6 +216,12 @@ class StorageServer:
def storea(self, oid, serial, data, version, id): def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id) self.rpc.callAsync('storea', oid, serial, data, version, id)
def storeBlobEnd(self, oid, serial, data, version, id):
self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
def storeBlob(self, oid, serial, chunk, version, id):
self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
## ##
# Start two-phase commit for a transaction # Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The # @param id id used by client to identify current transaction. The
...@@ -255,6 +261,9 @@ class StorageServer: ...@@ -255,6 +261,9 @@ class StorageServer:
def load(self, oid, version): def load(self, oid, version):
return self.rpc.call('load', oid, version) return self.rpc.call('load', oid, version)
def loadBlob(self, oid, serial, version, offset):
return self.rpc.call('loadBlob', oid, serial, version, offset)
def getSerial(self, oid): def getSerial(self, oid):
return self.rpc.call('getSerial', oid) return self.rpc.call('getSerial', oid)
......
...@@ -42,7 +42,7 @@ from ZODB.ConflictResolution import ResolvedSerial ...@@ -42,7 +42,7 @@ from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.utils import u64, oid_repr from ZODB.utils import u64, oid_repr, mktemp
from ZODB.loglevels import BLATHER from ZODB.loglevels import BLATHER
logger = logging.getLogger('ZEO.StorageServer') logger = logging.getLogger('ZEO.StorageServer')
...@@ -93,6 +93,9 @@ class ZEOStorage: ...@@ -93,6 +93,9 @@ class ZEOStorage:
self.log_label = _label self.log_label = _label
self.authenticated = 0 self.authenticated = 0
self.auth_realm = auth_realm self.auth_realm = auth_realm
self.blob_transfer = {}
self.blob_log = []
self.blob_loads = {}
# The authentication protocol may define extra methods. # The authentication protocol may define extra methods.
self._extensions = {} self._extensions = {}
for func in self.extensions: for func in self.extensions:
...@@ -454,6 +457,49 @@ class ZEOStorage: ...@@ -454,6 +457,49 @@ class ZEOStorage:
self.stats.stores += 1 self.stats.stores += 1
self.txnlog.store(oid, serial, data, version) self.txnlog.store(oid, serial, data, version)
def storeBlobEnd(self, oid, serial, data, version, id):
key = (oid, id)
if key not in self.blob_transfer:
raise Exception, "Can't finish a non-started Blob"
tempname, tempfile = self.blob_transfer.pop(key)
tempfile.close()
self.blob_log.append((oid, serial, data, tempname, version))
def storeBlob(self, oid, serial, chunk, version, id):
# XXX check that underlying storage supports blobs
key = (oid, id)
if key not in self.blob_transfer:
tempname = mktemp()
tempfile = open(tempname, "wb")
self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
else:
tempname, tempfile = self.blob_transfer[key]
tempfile.write(chunk)
def loadBlob(self, oid, serial, version, offset):
key = (oid, serial)
if not key in self.blob_loads:
self.blob_loads[key] = \
open(self.storage.loadBlob(oid, serial, version))
blobdata = self.blob_loads[key]
blobdata.seek(offset)
chunk = blobdata.read(4096)
if not chunk:
del self.blob_loads[key]
return chunk
# The following four methods return values, so they must acquire # The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning. # the storage lock and begin the transaction before returning.
...@@ -596,6 +642,13 @@ class ZEOStorage: ...@@ -596,6 +642,13 @@ class ZEOStorage:
# load oid, serial, data, version # load oid, serial, data, version
if not self._store(*loader.load()): if not self._store(*loader.load()):
break break
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename, version = self.blob_log.pop()
self.storage.storeBlob(oid, oldserial, data, blobfilename,
version, self.transaction,)
resp = self._thunk() resp = self._thunk()
if delay is not None: if delay is not None:
delay.reply(resp) delay.reply(resp)
......
...@@ -196,9 +196,65 @@ class MappingStorageTests(GenericTests): ...@@ -196,9 +196,65 @@ class MappingStorageTests(GenericTests):
def getConfig(self): def getConfig(self):
return """<mappingstorage 1/>""" return """<mappingstorage 1/>"""
test_classes = [OneTimeTests, class BlobAdaptedFileStorageTests(GenericTests):
FileStorageTests, """ZEO backed by a BlobStorage-adapted FileStorage."""
MappingStorageTests] def setUp(self):
self.blobdir = tempfile.mkdtemp()
super(BlobAdaptedFileStorageTests, self).setUp()
def tearDown(self):
import shutil
shutil.rmtree(self.blobdir)
super(BlobAdaptedFileStorageTests, self).tearDown()
def getConfig(self):
return """
<blobstorage 1>
blob-dir %s
<filestorage 2>
path %s
</filestorage>
</blobstorage>
""" % (self.blobdir, tempfile.mktemp())
def checkStoreBlob(self):
from ZODB.utils import oid_repr, tid_repr
from ZODB.Blobs.Blob import Blob
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials
import transaction
somedata = 'a' * 10
blob = Blob()
bd_fh = blob.open('w')
bd_fh.write(somedata)
bd_fh.close()
tfname = bd_fh.name
oid = self._storage.new_oid()
data = zodb_pickle(blob)
self.assert_(os.path.exists(tfname))
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
self.assert_(not os.path.exists(tfname))
filename = os.path.join(self.blobdir, oid_repr(oid),
tid_repr(revid) + BLOB_SUFFIX)
self.assert_(os.path.exists(filename))
self.assertEqual(somedata, open(filename).read())
test_classes = [FileStorageTests, MappingStorageTests,
BlobAdaptedFileStorageTests]
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
import os
import time
import tempfile
import weakref
from zope.interface import implements
from ZODB.Blobs.interfaces import IBlob
from ZODB.Blobs.exceptions import BlobError
from ZODB import utils
import transaction
from transaction.interfaces import IDataManager
from persistent import Persistent
class Blob(Persistent):
implements(IBlob)
_p_blob_readers = 0
_p_blob_writers = 0
_p_blob_uncommitted = None
_p_blob_data = None
def open(self, mode="r"):
""" Returns a file(-like) object representing blob data. This
method will either return the file object, raise a BlobError
or an IOError. A file may be open for exclusive read any
number of times, but may not be opened simultaneously for read
and write during the course of a single transaction and may
not be opened for simultaneous writes during the course of a
single transaction. Additionally, the file handle which
results from this method call is unconditionally closed at
transaction boundaries and so may not be used across
transactions. """
result = None
if (mode.startswith("r") or mode=="U"):
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
if self._p_blob_writers != 0:
raise BlobError, "Already opened for writing."
self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self)
elif mode.startswith("w"):
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None:
self._p_blob_uncommitted = utils.mktemp()
self._p_blob_writers += 1
result = BlobFile(self._p_blob_uncommitted, mode, self)
elif mode.startswith("a"):
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None:
# Create a new working copy
self._p_blob_uncommitted = utils.mktemp()
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
# NOTE: _p_blob data appears by virtue of Connection._setstate
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
else:
# Re-use existing working copy
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
self._p_blob_writers +=1
result = uncommitted
else:
raise IOError, 'invalid mode: %s ' % mode
if result is not None:
# we register ourselves as a data manager with the
# transaction machinery in order to be notified of
# commit/vote/abort events. We do this because at
# transaction boundaries, we need to fix up _p_ reference
# counts that keep track of open readers and writers and
# close any writable filehandles we've opened.
dm = BlobDataManager(self, result)
transaction.get().register(dm)
return result
# utility methods
def _current_filename(self):
# NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
# Connection._setstate
return self._p_blob_uncommitted or self._p_blob_data
def _change(self):
self._p_changed = 1
# utility methods which should not cause the object's state to be
# loaded if they are called while the object is a ghost. Thus,
# they are named with the _p_ convention and only operate against
# other _p_ instance attributes. We conventionally name these methods
# and attributes with a _p_blob prefix.
def _p_blob_clear(self):
self._p_blob_readers = 0
self._p_blob_writers = 0
def _p_blob_decref(self, mode):
if mode.startswith('r') or mode == 'U':
self._p_blob_readers = max(0, self._p_blob_readers - 1)
elif mode.startswith('w') or mode.startswith('a'):
self._p_blob_writers = max(0, self._p_blob_writers - 1)
else:
raise AssertionError, 'Unknown mode %s' % mode
def _p_blob_refcounts(self):
# used by unit tests
return self._p_blob_readers, self._p_blob_writers
class BlobDataManager:
"""Special data manager to handle transaction boundaries for blobs.
Blobs need some special care-taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_ reader and writer
refcount attributes must be set to a consistent state
b) the file objects might get passed out of the thread/transaction
and must deny any relationship to the original blob.
c) writable blob filehandles must be closed at the end of a txn so
as to not allow reuse between two transactions.
"""
implements(IDataManager)
def __init__(self, blob, filehandle):
self.blob = blob
# we keep a weakref to the file handle because we don't want to
# keep it alive if all other references to it die (e.g. in the
# case it's opened without assigning it to a name).
self.fhref = weakref.ref(filehandle)
self.subtransaction = False
self.sortkey = time.time()
def abort_sub(self, transaction):
pass
def commit_sub(self, transaction):
pass
def tpc_begin(self, transaction, subtransaction=False):
self.subtransaction = subtransaction
def tpc_abort(self, transaction):
pass
def tpc_finish(self, transaction):
self.subtransaction = False
def tpc_vote(self, transaction):
pass
def commit(self, object, transaction):
if not self.subtransaction:
self.blob._p_blob_clear() # clear all blob refcounts
filehandle = self.fhref()
if filehandle is not None:
filehandle.close()
def abort(self, object, transaction):
if not self.subtransaction:
self.blob._p_blob_clear()
filehandle = self.fhref()
if filehandle is not None:
filehandle.close()
def sortKey(self):
return self.sortkey
def beforeCompletion(self, transaction):
pass
def afterCompletion(self, transaction):
pass
class BlobFile(file):
""" A BlobFile is a file that can be used within a transaction
boundary; a BlobFile is just a Python file object, we only
override methods which cause a change to blob data in order to
call methods on our 'parent' persistent blob object signifying
that the change happened. """
# XXX these files should be created in the same partition as
# the storage later puts them to avoid copying them ...
def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode)
self.blob = blob
self.close_called = False
def write(self, data):
super(BlobFile, self).write(data)
self.blob._change()
def writelines(self, lines):
super(BlobFile, self).writelines(lines)
self.blob._change()
def truncate(self, size=0):
super(BlobFile, self).truncate(size)
self.blob._change()
def close(self):
# we don't want to decref twice
if not self.close_called:
self.blob._p_blob_decref(self.mode)
self.close_called = True
super(BlobFile, self).close()
def __del__(self):
# XXX we need to ensure that the file is closed at object
# expiration or our blob's refcount won't be decremented.
# This probably needs some work; I don't know if the names
# 'BlobFile' or 'super' will be available at program exit, but
# we'll assume they will be for now in the name of not
# muddying the code needlessly.
self.close()
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import shutil
import base64
from zope.interface import implements
from zope.proxy import ProxyBase, getProxiedObject
from ZODB import utils
from ZODB.Blobs.interfaces import IBlobStorage, IBlob
from ZODB.POSException import POSKeyError
BLOB_SUFFIX = ".blob"
BLOB_DIRTY = "store"
class BlobStorage(ProxyBase):
"""A storage to support blobs."""
implements(IBlobStorage)
__slots__ = ('base_directory', 'dirty_oids')
# XXX CM: what is the purpose of specifying __slots__ here?
def __new__(self, base_directory, storage):
return ProxyBase.__new__(self, storage)
def __init__(self, base_directory, storage):
# TODO Complain if storage is ClientStorage
ProxyBase.__init__(self, storage)
self.base_directory = base_directory
self.dirty_oids = []
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
self._lock_acquire()
try:
targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self._getCleanFilename(oid, serial)
utils.best_rename(blobfilename, targetname)
# XXX if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
return self._tid
def _getDirtyFilename(self, oid):
"""Generate an intermediate filename for two-phase commit.
"""
return self._getCleanFilename(oid, BLOB_DIRTY)
def _getBlobPath(self, oid):
return os.path.join(self.base_directory,
utils.oid_repr(oid)
)
def _getCleanFilename(self, oid, tid):
return os.path.join(self._getBlobPath(oid),
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def _finish(self, tid, u, d, e):
ProxyBase._finish(self, tid, u, d, e)
# Move dirty blobs if they are "really" dirty
self.dirty_blobs = []
def _abort(self):
ProxyBase._abort(self)
# Throw away the stuff we'd had committed
while self.dirty_blobs:
oid, serial = self.dirty_blobs.pop()
clean = self._getCleanFilename(oid, serial)
dirty = self._getDirtyFilename(oid, serial)
for filename in [clean, dirty]:
if os.exists(filename):
os.unlink(filename)
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
filename = self._getCleanFilename(oid, serial)
if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob."
return filename
def _packUndoing(self, packtime, referencesf):
# Walk over all existing revisions of all blob files and check
# if they are still needed by attempting to load the revision
# of that object from the database. This is maybe the slowest
# possible way to do this, but it's safe.
# XXX we should be tolerant of "garbage" directories/files in
# the base_directory here.
for oid_repr in os.listdir(self.base_directory):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(self.base_directory, oid_repr)
files = os.listdir(oid_path)
files.sort()
for filename in files:
filepath = os.path.join(oid_path, filename)
whatever, serial = self._splitBlobFilename(filepath)
try:
fn = self._getCleanFilename(oid, serial)
self.loadSerial(oid, serial)
except POSKeyError:
os.unlink(filepath)
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
def _packNonUndoing(self, packtime, referencesf):
for oid_repr in os.listdir(self.base_directory):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(self.base_directory, oid_repr)
exists = True
try:
self.load(oid, None) # no version support
except (POSKeyError, KeyError):
exists = False
if exists:
files = os.listdir(oid_path)
files.sort()
latest = files[-1] # depends on ever-increasing tids
files.remove(latest)
for file in files:
os.unlink(os.path.join(oid_path, file))
else:
shutil.rmtree(oid_path)
continue
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
def pack(self, packtime, referencesf):
"""Remove all unused oid/tid combinations."""
unproxied = getProxiedObject(self)
# pack the underlying storage, which will allow us to determine
# which serials are current.
result = unproxied.pack(packtime, referencesf)
# perform a pack on blob data
self._lock_acquire()
try:
if unproxied.supportsUndo():
self._packUndoing(packtime, referencesf)
else:
self._packNonUndoing(packtime, referencesf)
finally:
self._lock_release()
return result
def getSize(self):
"""Return the size of the database in bytes."""
orig_size = getProxiedObject(self).getSize()
blob_size = 0
for oid in os.listdir(self.base_directory):
for serial in os.listdir(os.path.join(self.base_directory, oid)):
if not serial.endswith(BLOB_SUFFIX):
continue
file_path = os.path.join(self.base_directory, oid, serial)
blob_size += os.stat(file_path).st_size
return orig_size + blob_size
def _splitBlobFilename(self, filename):
"""Returns OID, TID for a given blob filename.
If it's not a blob filename, (None, None) is returned.
"""
if not filename.endswith(BLOB_SUFFIX):
return None, None
path, filename = os.path.split(filename)
oid = os.path.split(path)[1]
serial = filename[:-len(BLOB_SUFFIX)]
oid = utils.repr_to_oid(oid)
if serial != BLOB_DIRTY:
serial = utils.repr_to_oid(serial)
else:
serial = None
return oid, serial
def undo(self, serial_id, transaction):
serial, keys = getProxiedObject(self).undo(serial_id, transaction)
self._lock_acquire()
try:
# The old serial_id is given in base64 encoding ...
serial_id = base64.decodestring(serial_id+ '\n')
for oid in self._getOIDsForSerial(serial_id):
data, serial_before, serial_after = \
self.loadBefore(oid, serial_id)
orig = file(self._getCleanFilename(oid, serial_before), "r")
new = file(self._getCleanFilename(oid, serial), "w")
utils.cp(orig, new)
orig.close()
new.close()
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
return serial, keys
def _getOIDsForSerial(self, search_serial):
oids = []
for oidpath in os.listdir(self.base_directory):
for filename in os.listdir(os.path.join(self.base_directory,
oidpath)):
blob_path = os.path.join(self.base_directory, oidpath,
filename)
oid, serial = self._splitBlobFilename(blob_path)
if search_serial == serial:
oids.append(oid)
return oids
Tests
-----
- ZConfig config testing (make sure that blob storage
config via ZConfig does the right thing)
- Test BlobStorage.getSize
- Test conflict behavior.
- Test shared client usage of blob cache dir.
- More ZEO tests.
Features
--------
- Ensure we detect and play a failed txn involving blobs forward or
backward at startup.
- Importing backward compatible ZEXP files (no \0BLOBSTART) used
- More options for blob directory structures (e.g. dirstorage's
bushy/chunky/lawn/flat).
- Log loudly on best_rename when it actually does a
copy.
- Unify ZEO/Blob implementation and the BlobProxyStorage directory
structures.
- Write code to clean up ClientStorage blob cache (it will grow without
bound currently).
- Allow "read-only" blob cache dirs from ClientStorages which can
point to a filesystem mount from the ZEO server of the canonical
blob locations.
Goal: Handle storage and retrieval of binary large objects efficiently,
transactionally, and transparently.
Measure:
- Don't block ZServer on uploads and downloads
- Don't hold BLOBS in memory or cache if not necessary (LRU caches tend
to break if we split BLOBs in lot of small objects. Size-based caches
tend to break on single large objects)
- Transparent for other systems, support normal ZODB operations.
Comments:
- Cache: BLOBs could be cached in a seperate "BLOB" space, e.g. in
single files
- Be storage independent?
- Memory efficiency: Storge.load() currently holds all data of an
object in a string.
Steps:
- simple aspects:
- blobs should be known by zodb
- storages, esp. clientstorage must be able to recognize blobs
- to avoid putting blob data into the client cache.
- blob data mustn't end up in the object cache
- blob object and blob data need to be handled separately
- blob data on client is stored in temporary files
- complicated aspects
- temporary files holding blob data could server as a
separated cache for blob data
- storage / zodb api change
Restrictions:
- a particular BLOB instance can't be open for read _and_ write at
the same time
- Allowed: N readers, no writers; 1 writer, no readers
- Reason:
- a writable filehandle opened via a BLOB's 'open' method has a
lifetime tied to the transaction in which the 'open' method was
called. We do this in order to prevent changes to blob data
from "bleeding over" between transactions.
- Data has been committed? -> File(name) for commited data available
- .open("r") on fresh loaded blob returns committed data
- first .open("w") -> new empty file for uncommitted data
- .open("a") or .open("r+"), we copy existing data into file for
uncommitted data
- if uncommitted data exists, subsequent .open("*") will use the
uncommitted data
- if opened for writing, the object is marked as changed
(optimiziation possible)
- connections want to recognize blobs on transaction boundaries
class BlobError(Exception):
pass
from zope.interface import Interface
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
def open(mode):
"""Returns a file(-like) object for handling the blob data.
mode: Mode to open the file with. Possible values: r,w,r+,a
"""
# XXX need a method to initialize the blob from the storage
# this means a) setting the _p_blob_data filename and b) putting
# the current data in that file
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, oldserial, data, blob, version, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial, version):
"""Return the filename of the Blob data responding to this OID and
serial.
Returns a filename or None if no Blob data is connected with this OID.
Raises POSKeyError if the blobfile cannot be found.
"""
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
ZODB Blob support
=================
You create a blob like this:
>>> from ZODB.Blobs.Blob import Blob
>>> myblob = Blob()
A blob implements the IBlob interface:
>>> from ZODB.Blobs.interfaces import IBlob
>>> IBlob.providedBy(myblob)
True
Opening a new Blob for reading fails:
>>> myblob.open("r")
Traceback (most recent call last):
...
BlobError: Blob does not exist.
But we can write data to a new Blob by opening it for writing:
>>> f = myblob.open("w")
>>> f.write("Hi, Blob!")
If we try to open a Blob again while it is open for writing, we get an error:
>>> myblob.open("r")
Traceback (most recent call last):
...
BlobError: Already opened for writing.
We can close the file:
>>> f.close()
Now we can open it for reading:
>>> f2 = myblob.open("r")
And we get the data back:
>>> f2.read()
'Hi, Blob!'
If we want to, we can open it again:
>>> f3 = myblob.open("r")
>>> f3.read()
'Hi, Blob!'
But we can't open it for writing, while it is opened for reading:
>>> myblob.open("a")
Traceback (most recent call last):
...
BlobError: Already opened for reading.
Before we can write, we have to close the readers:
>>> f2.close()
>>> f3.close()
Now we can open it for writing again and e.g. append data:
>>> f4 = myblob.open("a")
>>> f4.write("\nBlob is fine.")
>>> f4.close()
Now we can read it:
>>> f4a = myblob.open("r")
>>> f4a.read()
'Hi, Blob!\nBlob is fine.'
>>> f4a.close()
You shouldn't need to explicitly close a blob unless you hold a reference
to it via a name. If the first line in the following test kept a reference
around via a name, the second call to open it in a writable mode would fail
with a BlobError, but it doesn't.
>>> myblob.open("r+").read()
'Hi, Blob!\nBlob is fine.'
>>> f4b = myblob.open("a")
>>> f4b.close()
We can read lines out of the blob too:
>>> f5 = myblob.open("r")
>>> f5.readline()
'Hi, Blob!\n'
>>> f5.readline()
'Blob is fine.'
>>> f5.close()
We can seek to certain positions in a blob and read portions of it:
>>> f6 = myblob.open('r')
>>> f6.seek(4)
>>> int(f6.tell())
4
>>> f6.read(5)
'Blob!'
>>> f6.close()
We can use the object returned by a blob open call as an iterable:
>>> f7 = myblob.open('r')
>>> for line in f7:
... print line
Hi, Blob!
<BLANKLINE>
Blob is fine.
>>> f7.close()
We can truncate a blob:
>>> f8 = myblob.open('a')
>>> f8.truncate(0)
>>> f8.close()
>>> f8 = myblob.open('r')
>>> f8.read()
''
>>> f8.close()
We can explicitly open Blobs in the different modified modes:
>>> f9 = myblob.open("rb")
>>> f9.mode
'rb'
>>> f9.close()
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Connection support for Blobs tests
==================================
Connections handle Blobs specially. To demonstrate that, we first need a Blob with some data:
>>> from ZODB.Blobs.interfaces import IBlob
>>> from ZODB.Blobs.Blob import Blob
>>> blob = Blob()
>>> data = blob.open("w")
>>> data.write("I'm a happy Blob.")
>>> data.close()
We also need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.DB import DB
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Putting a Blob into a Connection works like every other object:
>>> connection = database.open()
>>> root = connection.root()
>>> root['myblob'] = blob
>>> import transaction
>>> transaction.commit()
Getting stuff out of there works similar:
>>> connection2 = database.open()
>>> root = connection2.root()
>>> blob2 = root['myblob']
>>> IBlob.providedBy(blob2)
True
>>> blob2.open("r").read()
"I'm a happy Blob."
You can't put blobs into a database that has uses a Non-Blob-Storage, though:
>>> no_blob_storage = MappingStorage()
>>> database2 = DB(no_blob_storage)
>>> connection3 = database2.open()
>>> root = connection3.root()
>>> root['myblob'] = Blob()
>>> transaction.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage instance at ...> is not supported.
While we are testing this, we don't need the storage directory and databases anymore:
>>> import shutil
>>> shutil.rmtree(blob_dir)
>>> transaction.abort()
>>> database.close()
>>> database2.close()
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Import/export support for blob data
===================================
Set up:
>>> from ZODB.FileStorage import FileStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.Blobs.Blob import Blob
>>> from ZODB.DB import DB
>>> from persistent.mapping import PersistentMapping
>>> import shutil
>>> import transaction
>>> from tempfile import mkdtemp, mktemp
>>> storagefile1 = mktemp()
>>> blob_dir1 = mkdtemp()
>>> storagefile2 = mktemp()
>>> blob_dir2 = mkdtemp()
We need an database with an undoing blob supporting storage:
>>> base_storage1 = FileStorage(storagefile1)
>>> blob_storage1 = BlobStorage(blob_dir1, base_storage1)
>>> base_storage2 = FileStorage(storagefile2)
>>> blob_storage2 = BlobStorage(blob_dir2, base_storage2)
>>> database1 = DB(blob_storage1)
>>> database2 = DB(blob_storage2)
Create our root object for database1:
>>> connection1 = database1.open()
>>> root1 = connection1.root()
Put a couple blob objects in our database1 and on the filesystem:
>>> import time, os
>>> nothing = transaction.begin()
>>> tid = blob_storage1._tid
>>> data1 = 'x'*100000
>>> blob1 = Blob()
>>> blob1.open('w').write(data1)
>>> data2 = 'y'*100000
>>> blob2 = Blob()
>>> blob2.open('w').write(data2)
>>> d = PersistentMapping({'blob1':blob1, 'blob2':blob2})
>>> root1['blobdata'] = d
>>> transaction.commit()
Export our blobs from a database1 connection:
>>> conn = root1['blobdata']._p_jar
>>> oid = root1['blobdata']._p_oid
>>> exportfile = mktemp()
>>> nothing = connection1.exportFile(oid, exportfile)
Import our exported data into database2:
>>> connection2 = database2.open()
>>> root2 = connection2.root()
>>> nothing = transaction.begin()
>>> data = root2._p_jar.importFile(exportfile)
>>> root2['blobdata'] = data
>>> transaction.commit()
Make sure our data exists:
>>> items1 = root1['blobdata']
>>> items2 = root2['blobdata']
>>> bool(items1.keys() == items2.keys())
True
>>> items1['blob1'].open().read() == items2['blob1'].open().read()
True
>>> items1['blob2'].open().read() == items2['blob2'].open().read()
True
Clean up our blob directory:
>>> shutil.rmtree(blob_dir1)
>>> shutil.rmtree(blob_dir2)
>>> os.unlink(exportfile)
>>> os.unlink(storagefile1)
>>> os.unlink(storagefile2)
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Packing support for blob data
=============================
Set up:
>>> from ZODB.FileStorage import FileStorage
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.serialize import referencesf
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.Blobs.Blob import Blob
>>> from ZODB import utils
>>> from ZODB.DB import DB
>>> import shutil
>>> import transaction
>>> from tempfile import mkdtemp, mktemp
>>> storagefile = mktemp()
>>> blob_dir = mkdtemp()
UNDOING
=======
We need an database with an undoing blob supporting storage:
>>> base_storage = FileStorage(storagefile)
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Create our root object:
>>> connection1 = database.open()
>>> root = connection1.root()
Put some revisions of a blob object in our database and on the filesystem:
>>> import time, os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> blob = Blob()
>>> blob.open('w').write('this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 4')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Get our blob filenames for this oid.
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
Do a pack to the slightly before the first revision was written:
>>> packtime = times[0]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Do a pack to the slightly before the second revision was written:
>>> packtime = times[1]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Do a pack to the slightly before the third revision was written:
>>> packtime = times[2]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, True, True, True, True]
Do a pack to the slightly before the fourth revision was written:
>>> packtime = times[3]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, True, True, True]
Do a pack to the slightly before the fifth revision was written:
>>> packtime = times[4]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, True, True]
Do a pack to now:
>>> packtime = time.time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, True]
Delete the object and do a pack, it should get rid of the most current
revision as well as the entire directory:
>>> nothing = transaction.begin()
>>> del root['blob']
>>> transaction.commit()
>>> packtime = time.time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, False]
>>> os.path.exists(os.path.split(fns[0])[0])
False
Clean up our blob directory and database:
>>> shutil.rmtree(blob_dir)
>>> os.unlink(storagefile)
NON-UNDOING
===========
We need an database with a NON-undoing blob supporting storage:
>>> base_storage = MappingStorage('storage')
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Create our root object:
>>> connection1 = database.open()
>>> root = connection1.root()
Put some revisions of a blob object in our database and on the filesystem:
>>> import time, os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> blob = Blob()
>>> blob.open('w').write('this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(time.time())
>>> root['blob'].open('w').write('this is blob data 4')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Get our blob filenames for this oid.
>>> fns = [ blob_storage._getCleanFilename(oid, x) for x in tids ]
Do a pack to the slightly before the first revision was written:
>>> packtime = times[0]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, True]
Do a pack to now:
>>> packtime = time.time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, True]
Delete the object and do a pack, it should get rid of the most current
revision as well as the entire directory:
>>> nothing = transaction.begin()
>>> del root['blob']
>>> transaction.commit()
>>> packtime = time.time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, False]
>>> os.path.exists(os.path.split(fns[0])[0])
False
Clean up our blob directory:
>>> shutil.rmtree(blob_dir)
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.testing.doctestunit import DocFileSuite
def test_suite():
return DocFileSuite("basic.txt", "connection.txt", "transaction.txt",
"packing.txt", "importexport.txt")
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Transaction support for Blobs
=============================
We need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.DB import DB
>>> import transaction
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Putting a Blob into a Connection works like any other Persistent object:
>>> connection1 = database.open()
>>> root1 = connection1.root()
>>> from ZODB.Blobs.Blob import Blob
>>> blob1 = Blob()
>>> blob1.open('w').write('this is blob 1')
>>> root1['blob1'] = blob1
>>> transaction.commit()
Opening a blob gives us a filehandle. Getting data out of the
resulting filehandle is accomplished via the filehandle's read method:
>>> connection2 = database.open()
>>> root2 = connection2.root()
>>> blob1a = root2['blob1']
>>> blob1a._p_blob_refcounts()
(0, 0)
>>>
>>> blob1afh1 = blob1a.open("r")
>>> blob1afh1.read()
'this is blob 1'
>>> # The filehandle keeps a reference to its blob object
>>> blob1afh1.blob._p_blob_refcounts()
(1, 0)
Let's make another filehandle for read only to blob1a, this should bump
up its refcount by one, and each file handle has a reference to the
(same) underlying blob:
>>> blob1afh2 = blob1a.open("r")
>>> blob1afh2.blob._p_blob_refcounts()
(2, 0)
>>> blob1afh1.blob._p_blob_refcounts()
(2, 0)
>>> blob1afh2.blob is blob1afh1.blob
True
Let's close the first filehandle we got from the blob, this should decrease
its refcount by one:
>>> blob1afh1.close()
>>> blob1a._p_blob_refcounts()
(1, 0)
Let's abort this transaction, and ensure that the filehandles that we
opened are now closed and that the filehandle refcounts on the blob
object are cleared.
>>> transaction.abort()
>>> blob1afh1.blob._p_blob_refcounts()
(0, 0)
>>> blob1afh2.blob._p_blob_refcounts()
(0, 0)
>>> blob1a._p_blob_refcounts()
(0, 0)
>>> blob1afh2.read()
Traceback (most recent call last):
...
ValueError: I/O operation on closed file
If we open a blob for append, its write refcount should be nonzero.
Additionally, writing any number of bytes to the blobfile should
result in the blob being marked "dirty" in the connection (we just
aborted above, so the object should be "clean" when we start):
>>> bool(blob1a._p_changed)
False
>>> blob1afh3 = blob1a.open('a')
>>> blob1afh3.write('woot!')
>>> blob1a._p_blob_refcounts()
(0, 1)
>>> bool(blob1a._p_changed)
True
We can open more than one blob object during the course of a single
transaction:
>>> blob2 = Blob()
>>> blob2.open('w').write('this is blob 3')
>>> root2['blob2'] = blob2
>>> transaction.commit()
>>> blob2._p_blob_refcounts()
(0, 0)
Since we committed the current transaction above, the aggregate
changes we've made to blob, blob1a (these refer to the same object) and
blob2 (a different object) should be evident:
>>> blob1.open('r').read()
'this is blob 1woot!'
>>> blob1a.open('r').read()
'this is blob 1woot!'
>>> blob2.open('r').read()
'this is blob 3'
We shouldn't be able to persist a blob filehandle at commit time
(although the exception which is raised when an object cannot be
pickled appears to be particulary unhelpful for casual users at the
moment):
>>> root1['wontwork'] = blob1.open('r')
>>> transaction.commit()
Traceback (most recent call last):
...
TypeError: coercing to Unicode: need string or buffer, BlobFile found
Abort for good measure:
>>> transaction.abort()
Attempting to change a blob simultaneously from two different
connections should result in a write conflict error.
>>> tm1 = transaction.TransactionManager()
>>> tm2 = transaction.TransactionManager()
>>> root3 = database.open(txn_mgr=tm1).root()
>>> root4 = database.open(txn_mgr=tm2).root()
>>> blob1c3 = root3['blob1']
>>> blob1c4 = root4['blob1']
>>> blob1c3fh1 = blob1c3.open('a')
>>> blob1c4fh1 = blob1c4.open('a')
>>> blob1c3fh1.write('this is from connection 3')
>>> blob1c4fh1.write('this is from connection 4')
>>> tm1.get().commit()
>>> tm2.get().commit()
Traceback (most recent call last):
...
ConflictError: database conflict error (oid 0x01, class ZODB.Blobs.Blob.Blob)
While we are testing this, we don't need the storage directory and databases
anymore:
>>> import shutil
>>> shutil.rmtree(blob_dir)
>>> tm1.get().abort()
>>> tm2.get().abort()
>>> database.close()
...@@ -27,6 +27,7 @@ from persistent import PickleCache ...@@ -27,6 +27,7 @@ from persistent import PickleCache
# interfaces # interfaces
from persistent.interfaces import IPersistentDataManager from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection from ZODB.interfaces import IConnection
from ZODB.Blobs.interfaces import IBlob, IBlobStorage
from transaction.interfaces import ISavepointDataManager from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer from transaction.interfaces import ISynchronizer
...@@ -551,7 +552,23 @@ class Connection(ExportImport, object): ...@@ -551,7 +552,23 @@ class Connection(ExportImport, object):
raise ConflictError(object=obj) raise ConflictError(object=obj)
self._modified.append(oid) self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction)
if IBlob.providedBy(obj):
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Storing Blobs in %s is not supported." %
repr(self._storage))
s = self._storage.storeBlob(oid, serial, p,
obj._p_blob_uncommitted,
self._version, transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, self._version,
transaction)
self._store_count += 1 self._store_count += 1
# Put the object in the cache before handling the # Put the object in the cache before handling the
# response, just in case the response contains the # response, just in case the response contains the
......
...@@ -13,13 +13,16 @@ ...@@ -13,13 +13,16 @@
############################################################################## ##############################################################################
"""Support for database export and import.""" """Support for database export and import."""
import os
from cStringIO import StringIO from cStringIO import StringIO
from cPickle import Pickler, Unpickler from cPickle import Pickler, Unpickler
from tempfile import TemporaryFile from tempfile import TemporaryFile
import logging import logging
from ZODB.POSException import ExportError from ZODB.POSException import ExportError, POSKeyError
from ZODB.utils import p64, u64 from ZODB.utils import p64, u64, cp, mktemp
from ZODB.Blobs.interfaces import IBlobStorage
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
logger = logging.getLogger('ZODB.ExportImport') logger = logging.getLogger('ZODB.ExportImport')
...@@ -49,6 +52,21 @@ class ExportImport: ...@@ -49,6 +52,21 @@ class ExportImport:
else: else:
referencesf(p, oids) referencesf(p, oids)
f.writelines([oid, p64(len(p)), p]) f.writelines([oid, p64(len(p)), p])
# Blob support
if not IBlobStorage.providedBy(self._storage):
continue
try:
blobfilename = self._storage.loadBlob(oid,
serial, self._version)
except POSKeyError: # Looks like this is not a blob
continue
f.write(blob_begin_marker)
f.write(p64(os.stat(blobfilename).st_size))
blobdata = open(blobfilename, "rb")
cp(blobdata, f)
blobdata.close()
f.write(export_end_marker) f.write(export_end_marker)
return f return f
...@@ -113,17 +131,20 @@ class ExportImport: ...@@ -113,17 +131,20 @@ class ExportImport:
version = self._version version = self._version
while 1: while 1:
h = f.read(16) header = f.read(16)
if h == export_end_marker: if header == export_end_marker:
break break
if len(h) != 16: if len(header) != 16:
raise ExportError("Truncated export file") raise ExportError("Truncated export file")
l = u64(h[8:16])
p = f.read(l) # Extract header information
if len(p) != l: ooid = header[:8]
length = u64(header[8:16])
data = f.read(length)
if len(data) != length:
raise ExportError("Truncated export file") raise ExportError("Truncated export file")
ooid = h[:8]
if oids: if oids:
oid = oids[ooid] oid = oids[ooid]
if isinstance(oid, tuple): if isinstance(oid, tuple):
...@@ -132,7 +153,21 @@ class ExportImport: ...@@ -132,7 +153,21 @@ class ExportImport:
oids[ooid] = oid = self._storage.new_oid() oids[ooid] = oid = self._storage.new_oid()
return_oid_list.append(oid) return_oid_list.append(oid)
pfile = StringIO(p) # Blob support
blob_begin = f.read(len(blob_begin_marker))
if blob_begin == blob_begin_marker:
# Copy the blob data to a temporary file
# and remember the name
blob_len = u64(f.read(8))
blob_filename = mktemp()
blob_file = open(blob_filename, "wb")
cp(f, blob_file, blob_len)
blob_file.close()
else:
f.seek(-len(blob_begin_marker),1)
blob_filename = None
pfile = StringIO(data)
unpickler = Unpickler(pfile) unpickler = Unpickler(pfile)
unpickler.persistent_load = persistent_load unpickler.persistent_load = persistent_load
...@@ -142,12 +177,17 @@ class ExportImport: ...@@ -142,12 +177,17 @@ class ExportImport:
pickler.dump(unpickler.load()) pickler.dump(unpickler.load())
pickler.dump(unpickler.load()) pickler.dump(unpickler.load())
p = newp.getvalue() data = newp.getvalue()
self._storage.store(oid, None, p, version, transaction) if blob_filename is not None:
self._storage.storeBlob(oid, None, data, blob_filename,
version, transaction)
else:
self._storage.store(oid, None, data, version, transaction)
export_end_marker = '\377'*16 export_end_marker = '\377'*16
blob_begin_marker = '\000BLOBSTART'
class Ghost(object): class Ghost(object):
__slots__ = ("oid",) __slots__ = ("oid",)
......
...@@ -628,7 +628,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -628,7 +628,7 @@ class FileStorage(BaseStorage.BaseStorage,
finally: finally:
self._lock_release() self._lock_release()
def store(self, oid, serial, data, version, transaction): def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
if transaction is not self._transaction: if transaction is not self._transaction:
...@@ -651,12 +651,12 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -651,12 +651,12 @@ class FileStorage(BaseStorage.BaseStorage,
pnv = h.pnv pnv = h.pnv
cached_tid = h.tid cached_tid = h.tid
if serial != cached_tid: if oldserial != cached_tid:
rdata = self.tryToResolveConflict(oid, cached_tid, rdata = self.tryToResolveConflict(oid, cached_tid,
serial, data) oldserial, data)
if rdata is None: if rdata is None:
raise POSException.ConflictError( raise POSException.ConflictError(
oid=oid, serials=(cached_tid, serial), data=data) oid=oid, serials=(cached_tid, oldserial), data=data)
else: else:
data = rdata data = rdata
...@@ -686,7 +686,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -686,7 +686,7 @@ class FileStorage(BaseStorage.BaseStorage,
raise FileStorageQuotaError( raise FileStorageQuotaError(
"The storage quota has been exceeded.") "The storage quota has been exceeded.")
if old and serial != cached_tid: if old and oldserial != cached_tid:
return ConflictResolution.ResolvedSerial return ConflictResolution.ResolvedSerial
else: else:
return self._tid return self._tid
......
...@@ -65,6 +65,11 @@ ...@@ -65,6 +65,11 @@
<sectiontype name="zeoclient" datatype=".ZEOClient" <sectiontype name="zeoclient" datatype=".ZEOClient"
implements="ZODB.storage"> implements="ZODB.storage">
<multikey name="server" datatype="socket-connection-address" required="yes"/> <multikey name="server" datatype="socket-connection-address" required="yes"/>
<key name="blob-dir" required="no" default="/tmp">
<description>
Path name to the blob storage directory.
</description>
</key>
<key name="storage" default="1"> <key name="storage" default="1">
<description> <description>
The name of the storage that the client wants to use. If the The name of the storage that the client wants to use. If the
...@@ -158,4 +163,18 @@ ...@@ -158,4 +163,18 @@
<key name="version-cache-size" datatype="integer" default="100"/> <key name="version-cache-size" datatype="integer" default="100"/>
</sectiontype> </sectiontype>
<sectiontype name="blobstorage" datatype=".BlobStorage"
implements="ZODB.storage">
<key name="blob-dir" required="yes">
<description>
Path name to the blob storage directory.
</description>
</key>
<section type="ZODB.storage" name="*" attribute="base"/>
</sectiontype>
</component> </component>
...@@ -132,6 +132,14 @@ class FileStorage(BaseConfig): ...@@ -132,6 +132,14 @@ class FileStorage(BaseConfig):
read_only=self.config.read_only, read_only=self.config.read_only,
quota=self.config.quota) quota=self.config.quota)
class BlobStorage(BaseConfig):
def open(self):
from ZODB.Blobs.BlobStorage import BlobStorage
base = self.config.base.open()
return BlobStorage(self.config.blob_dir, base)
class ZEOClient(BaseConfig): class ZEOClient(BaseConfig):
def open(self): def open(self):
...@@ -141,6 +149,7 @@ class ZEOClient(BaseConfig): ...@@ -141,6 +149,7 @@ class ZEOClient(BaseConfig):
L = [server.address for server in self.config.server] L = [server.address for server in self.config.server]
return ClientStorage( return ClientStorage(
L, L,
blob_dir=self.config.blob_dir,
storage=self.config.storage, storage=self.config.storage,
cache_size=self.config.cache_size, cache_size=self.config.cache_size,
name=self.config.name, name=self.config.name,
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Support for testing logging code
If you want to test that your code generates proper log output, you
can create and install a handler that collects output:
>>> handler = InstalledHandler('foo.bar')
The handler is installed into loggers for all of the names passed. In
addition, the logger level is set to 1, which means, log
everything. If you want to log less than everything, you can provide a
level keyword argument. The level setting effects only the named
loggers.
Then, any log output is collected in the handler:
>>> logging.getLogger('foo.bar').exception('eek')
>>> logging.getLogger('foo.bar').info('blah blah')
>>> for record in handler.records:
... print record.name, record.levelname
... print ' ', record.getMessage()
foo.bar ERROR
eek
foo.bar INFO
blah blah
A similar effect can be gotten by just printing the handler:
>>> print handler
foo.bar ERROR
eek
foo.bar INFO
blah blah
After checking the log output, you need to uninstall the handler:
>>> handler.uninstall()
At which point, the handler won't get any more log output.
Let's clear the handler:
>>> handler.clear()
>>> handler.records
[]
And then log something:
>>> logging.getLogger('foo.bar').info('blah')
and, sure enough, we still have no output:
>>> handler.records
[]
$Id: loggingsupport.py 28349 2004-11-06 00:10:32Z tim_one $
"""
import logging
class Handler(logging.Handler):
def __init__(self, *names, **kw):
logging.Handler.__init__(self)
self.names = names
self.records = []
self.setLoggerLevel(**kw)
def setLoggerLevel(self, level=1):
self.level = level
self.oldlevels = {}
def emit(self, record):
self.records.append(record)
def clear(self):
del self.records[:]
def install(self):
for name in self.names:
logger = logging.getLogger(name)
self.oldlevels[name] = logger.level
logger.setLevel(self.level)
logger.addHandler(self)
def uninstall(self):
for name in self.names:
logger = logging.getLogger(name)
logger.setLevel(self.oldlevels[name])
logger.removeHandler(self)
def __str__(self):
return '\n'.join(
[("%s %s\n %s" %
(record.name, record.levelname,
'\n'.join([line
for line in record.getMessage().split('\n')
if line.strip()])
)
)
for record in self.records]
)
class InstalledHandler(Handler):
def __init__(self, *names):
Handler.__init__(self, *names)
self.install()
...@@ -16,11 +16,13 @@ import sys ...@@ -16,11 +16,13 @@ import sys
import time import time
import struct import struct
from struct import pack, unpack from struct import pack, unpack
from binascii import hexlify from binascii import hexlify, unhexlify
import cPickle as pickle import cPickle as pickle
from cStringIO import StringIO from cStringIO import StringIO
import weakref import weakref
import warnings import warnings
from tempfile import mkstemp
import os
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
...@@ -90,20 +92,33 @@ def u64(v): ...@@ -90,20 +92,33 @@ def u64(v):
U64 = u64 U64 = u64
def cp(f1, f2, l): def cp(f1, f2, length=None):
"""Copy all data from one file to another.
It copies the data from the current position of the input file (f1)
appending it to the current position of the output file (f2).
It copies at most 'length' bytes. If 'length' isn't given, it copies
until the end of the input file.
"""
read = f1.read read = f1.read
write = f2.write write = f2.write
n = 8192 n = 8192
while l > 0: if length is None:
if n > l: old_pos = f1.tell()
n = l f1.seek(0,2)
d = read(n) length = f1.tell()
if not d: f1.seek(old_pos)
while length > 0:
if n > length:
n = length
data = read(n)
if not data:
break break
write(d) write(data)
l = l - len(d) length -= len(data)
def newTimeStamp(old=None, def newTimeStamp(old=None,
TimeStamp=TimeStamp, TimeStamp=TimeStamp,
...@@ -128,6 +143,13 @@ def oid_repr(oid): ...@@ -128,6 +143,13 @@ def oid_repr(oid):
else: else:
return repr(oid) return repr(oid)
def repr_to_oid(repr):
if repr.startswith("0x"):
repr = repr[2:]
as_bin = unhexlify(repr)
as_bin = "\x00"*(8-len(as_bin)) + as_bin
return as_bin
serial_repr = oid_repr serial_repr = oid_repr
tid_repr = serial_repr tid_repr = serial_repr
...@@ -273,3 +295,35 @@ class WeakSet(object): ...@@ -273,3 +295,35 @@ class WeakSet(object):
# We're cheating by breaking into the internals of Python's # We're cheating by breaking into the internals of Python's
# WeakValueDictionary here (accessing its .data attribute). # WeakValueDictionary here (accessing its .data attribute).
return self.data.data.values() return self.data.data.values()
def mktemp():
"""Create a temp file, known by name, in a semi-secure manner."""
handle, filename = mkstemp()
os.close(handle)
return filename
def best_rename(sourcename, targetname):
""" Try to rename via os.rename, but if we can't (for instance, if the
source and target are on separate partitions/volumes), fall back to copying
the file and unlinking the original. """
try:
os.rename(sourcename, targetname)
except OSError:
# XXX CM: I don't think this is a good idea; maybe just fail
# here instead of doing a brute force copy? This is awfully
# expensive and people won't know it's happening without
# at least a warning. It also increases the possibility of a race
# condition: both the source and target filenames exist at the
# same time.
source = open(sourcename, "rb")
target = open(targetname, "wb")
while True:
chunk = source.read(1<<16)
if not chunk:
break
target.write(chunk)
source.close()
target.close()
os.unlink(sourcename)
zope.interface
zope.testing
# Packaging information for zpkg.
header proxy.h
<extension _zope_proxy_proxy>
source _zope_proxy_proxy.c
depends-on proxy.h
</extension>
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""More convenience functions for dealing with proxies.
$Id$
"""
from zope.interface import moduleProvides
from zope.proxy.interfaces import IProxyIntrospection
from types import ClassType
from zope.proxy._zope_proxy_proxy import *
from zope.proxy._zope_proxy_proxy import _CAPI
moduleProvides(IProxyIntrospection)
__all__ = tuple(IProxyIntrospection)
def ProxyIterator(p):
yield p
while isProxy(p):
p = getProxiedObject(p)
yield p
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Proxy-related interfaces.
$Id$
"""
from zope.interface import Interface
class IProxyIntrospection(Interface):
"""Provides methods for indentifying proxies and extracting proxied objects
"""
def isProxy(obj, proxytype=None):
"""Check whether the given object is a proxy
If proxytype is not None, checkes whether the object is
proxied by the given proxytype.
"""
def sameProxiedObjects(ob1, ob2):
"""Check whether ob1 and ob2 are the same or proxies of the same object
"""
def getProxiedObject(obj):
"""Get the proxied Object
If the object isn't proxied, then just return the object.
"""
def removeAllProxies(obj):
"""Get the proxied object with no proxies
If obj is not a proxied object, return obj.
The returned object has no proxies.
"""
def queryProxy(obj, proxytype, default=None):
"""Look for a proxy of the given type around the object
If no such proxy can be found, return the default.
"""
def queryInnerProxy(obj, proxytype, default=None):
"""Look for the inner-most proxy of the given type around the object
If no such proxy can be found, return the default.
If there is such a proxy, return the inner-most one.
"""
#ifndef _proxy_H_
#define _proxy_H_ 1
typedef struct {
PyObject_HEAD
PyObject *proxy_object;
} ProxyObject;
#define Proxy_GET_OBJECT(ob) (((ProxyObject *)(ob))->proxy_object)
typedef struct {
PyTypeObject *proxytype;
int (*check)(PyObject *obj);
PyObject *(*create)(PyObject *obj);
PyObject *(*getobject)(PyObject *proxy);
} ProxyInterface;
#ifndef PROXY_MODULE
/* These are only defined in the public interface, and are not
* available within the module implementation. There we use the
* classic Python/C API only.
*/
static ProxyInterface *_proxy_api = NULL;
static int
Proxy_Import(void)
{
if (_proxy_api == NULL) {
PyObject *m = PyImport_ImportModule("zope.proxy");
if (m != NULL) {
PyObject *tmp = PyObject_GetAttrString(m, "_CAPI");
if (tmp != NULL) {
if (PyCObject_Check(tmp))
_proxy_api = (ProxyInterface *)
PyCObject_AsVoidPtr(tmp);
Py_DECREF(tmp);
}
}
}
return (_proxy_api == NULL) ? -1 : 0;
}
#define ProxyType (*_proxy_api->proxytype)
#define Proxy_Check(obj) (_proxy_api->check((obj)))
#define Proxy_CheckExact(obj) ((obj)->ob_type == ProxyType)
#define Proxy_New(obj) (_proxy_api->create((obj)))
#define Proxy_GetObject(proxy) (_proxy_api->getobject((proxy)))
#endif /* PROXY_MODULE */
#endif /* _proxy_H_ */
#
# This file is necessary to make this directory a package.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment