Commit 6385b299 authored by Christian Theune's avatar Christian Theune

- Merged ctheune-blob-merge-branch to trunk.

parent 624305d2
......@@ -25,6 +25,15 @@ Transactions
Clean up weird import dance with ZODB. This is unnecessary since the
transaction module stopped being imported in ZODB/__init__.py in rev 39622.
Blobs
-----
- (3.8a1) Added new blob feature. See the ZODB/Blobs directory for
documentation.
ZODB now handles (reasonably) large binary objects efficiently. Useful to
use from a few kilobytes to at least multiple hundred megabytes.
What's new on ZODB 3.7b2?
=========================
......
......@@ -27,6 +27,7 @@ import time
import types
import logging
from zope.interface import implements
from ZEO import ServerStub
from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
......@@ -35,7 +36,10 @@ from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
from ZODB.Blobs.interfaces import IBlobStorage
from ZODB.Blobs.Blob import FilesystemHelper
from persistent.TimeStamp import TimeStamp
logger = logging.getLogger('ZEO.ClientStorage')
......@@ -93,6 +97,7 @@ class ClientStorage(object):
tpc_begin().
"""
implements(IBlobStorage)
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
......@@ -106,7 +111,8 @@ class ClientStorage(object):
wait_for_server_on_startup=None, # deprecated alias for wait
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
username='', password='', realm=None):
username='', password='', realm=None,
blob_dir=None):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
......@@ -177,6 +183,11 @@ class ClientStorage(object):
password -- string with plaintext password to be used
when authenticated.
realm -- not documented.
blob_dir -- directory path for blob data. 'blob data' is data that
is retrieved via the loadBlob API.
Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details).
......@@ -303,6 +314,18 @@ class ClientStorage(object):
# is executing.
self._lock = threading.Lock()
# XXX need to check for POSIX-ness here
if blob_dir is not None:
self.fshelper = FilesystemHelper(blob_dir)
self.fshelper.create()
self.fshelper.checkSecure()
else:
self.fshelper = None
# Initialize locks
self.blob_status_lock = threading.Lock()
self.blob_status = {}
# Decide whether to use non-temporary files
if client is not None:
dir = var or os.getcwd()
......@@ -866,6 +889,118 @@ class ClientStorage(object):
self._tbuf.store(oid, version, data)
return self._check_serials()
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
"""Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
blobfile = open(blobfilename, "rb")
while True:
chunk = blobfile.read(1<<16)
# even if the blobfile is completely empty, we need to call
# storeBlob at least once in order to be able to call
# storeBlobEnd successfully.
self._server.storeBlob(oid, serial, chunk, version, id(txn))
if not chunk:
self._server.storeBlobEnd(oid, serial, data, version, id(txn))
break
blobfile.close()
os.unlink(blobfilename)
return serials
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
blob_filename = self.fshelper.getBlobFilename(oid, serial)
if self._server is None:
raise ClientDisconnected()
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
# We write to a temporary file first, so we do not accidentally
# allow half-baked copies of this blob be loaded
tempfd, tempfilename = self.fshelper.blob_mkstemp(oid, serial)
tempfile = os.fdopen(tempfd, 'wb')
offset = 0
while True:
chunk = self._server.loadBlob(oid, serial, version, offset)
if not chunk:
break
offset += len(chunk)
tempfile.write(chunk)
tempfile.close()
# XXX will fail on Windows if file is open
os.rename(tempfilename, blob_filename)
return blob_filename
def loadBlob(self, oid, serial, version):
"""Loading a blob has to know about loading the same blob
from another thread as the same time.
1. Check if the blob is downloaded already
2. Check whether it is currently beeing downloaded
2a. Wait for other download to finish, return
3. If not beeing downloaded, start download
"""
if self.fshelper is None:
raise POSException.Unsupported("No blob cache directory is "
"configured.")
blob_filename = self.fshelper.getBlobFilename(oid, serial)
# Case 1: Blob is available already, just use it
if os.path.exists(blob_filename):
log2("Found blob %s/%s in cache." % (utils.oid_repr(oid),
utils.tid_repr(serial)), level=BLATHER)
return blob_filename
# Case 2,3: Blob might still be downloading or not there yet
# Try to get or create a lock for the downloading of this blob,
# identified by it's oid and serial
lock_key = (oid, serial)
# We need to make the check for an existing lock and the possible
# creation of a new one atomic, so there is another lock:
self.blob_status_lock.acquire()
try:
if not self.blob_status.has_key(oid):
self.blob_status[lock_key] = self.getBlobLock()
lock = self.blob_status[lock_key]
finally:
self.blob_status_lock.release()
# We acquire the lock to either start downloading, or wait
# for another download to finish
lock.acquire()
try:
# If there was another download that is finished by now,
# we just take the result.
if os.path.exists(blob_filename):
log2("Found blob %s/%s in cache after it was downloaded "
"from another thread." % (utils.oid_repr(oid),
utils.tid_repr(serial)), level=BLATHER)
return blob_filename
# Otherwise we download and use that
return self._do_load_blob(oid, serial, version)
finally:
# When done we remove the download lock ...
lock.release()
# And the status information isn't needed as well,
# but we have to use the second lock here as well, to avoid
# making the creation of this status lock non-atomic (see above)
self.blob_status_lock.acquire()
try:
del self.blob_status[lock_key]
finally:
self.blob_status_lock.release()
def getBlobLock(self):
# indirection to support unit testing
return Lock()
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
if txn is not self._transaction:
......
......@@ -220,6 +220,12 @@ class StorageServer:
def storea(self, oid, serial, data, version, id):
self.rpc.callAsync('storea', oid, serial, data, version, id)
def storeBlobEnd(self, oid, serial, data, version, id):
self.rpc.callAsync('storeBlobEnd', oid, serial, data, version, id)
def storeBlob(self, oid, serial, chunk, version, id):
self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
##
# Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The
......@@ -262,6 +268,9 @@ class StorageServer:
def load(self, oid, version):
return self.rpc.call('load', oid, version)
def loadBlob(self, oid, serial, version, offset):
return self.rpc.call('loadBlob', oid, serial, version, offset)
def getSerial(self, oid):
return self.rpc.call('getSerial', oid)
......
......@@ -42,20 +42,24 @@ from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.utils import u64, oid_repr
from ZODB.utils import u64, oid_repr, mktemp
from ZODB.loglevels import BLATHER
logger = logging.getLogger('ZEO.StorageServer')
# TODO: This used to say "ZSS", which is now implied in the logger name.
# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
_label = "" # default label used for logging.
def set_label():
"""Internal helper to reset the logging label (e.g. after fork())."""
global _label
_label = "%s" % os.getpid()
def log(message, level=logging.INFO, label=None, exc_info=False):
"""Internal helper to log a message."""
label = label or _label
......@@ -63,9 +67,11 @@ def log(message, level=logging.INFO, label=None, exc_info=False):
message = "(%s) %s" % (label, message)
logger.log(level, message, exc_info=exc_info)
class StorageServerError(StorageError):
"""Error reported when an unpicklable exception is raised."""
class ZEOStorage:
"""Proxy to underlying storage for a single remote client."""
......@@ -93,6 +99,9 @@ class ZEOStorage:
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_transfer = {}
self.blob_log = []
self.blob_loads = {}
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
......@@ -155,7 +164,6 @@ class ZEOStorage:
if record_iternext is not None:
self.record_iternext = record_iternext
try:
fn = self.storage.getExtensionMethods
except AttributeError:
......@@ -460,6 +468,38 @@ class ZEOStorage:
self.stats.stores += 1
self.txnlog.store(oid, serial, data, version)
def storeBlobEnd(self, oid, serial, data, version, id):
key = (oid, id)
if key not in self.blob_transfer:
raise Exception, "Can't finish a non-started Blob"
tempname, tempfile = self.blob_transfer.pop(key)
tempfile.close()
self.blob_log.append((oid, serial, data, tempname, version))
def storeBlob(self, oid, serial, chunk, version, id):
# XXX check that underlying storage supports blobs
key = (oid, id)
if key not in self.blob_transfer:
tempname = mktemp()
tempfile = open(tempname, "wb")
self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
else:
tempname, tempfile = self.blob_transfer[key]
tempfile.write(chunk)
def loadBlob(self, oid, serial, version, offset):
key = (oid, serial)
if not key in self.blob_loads:
self.blob_loads[key] = \
open(self.storage.loadBlob(oid, serial, version))
blobdata = self.blob_loads[key]
blobdata.seek(offset)
chunk = blobdata.read(4096)
if not chunk:
del self.blob_loads[key]
return chunk
# The following four methods return values, so they must acquire
# the storage lock and begin the transaction before returning.
......@@ -602,6 +642,13 @@ class ZEOStorage:
# load oid, serial, data, version
if not self._store(*loader.load()):
break
# Blob support
while self.blob_log:
oid, oldserial, data, blobfilename, version = self.blob_log.pop()
self.storage.storeBlob(oid, oldserial, data, blobfilename,
version, self.transaction,)
resp = self._thunk()
if delay is not None:
delay.reply(resp)
......@@ -919,6 +966,7 @@ class StorageServer:
if conn.obj in cl:
cl.remove(conn.obj)
class StubTimeoutThread:
def begin(self, client):
......@@ -987,11 +1035,13 @@ class TimeoutThread(threading.Thread):
else:
time.sleep(howlong)
def run_in_thread(method, *args):
t = SlowMethodThread(method, args)
t.start()
return t.delay
class SlowMethodThread(threading.Thread):
"""Thread to run potentially slow storage methods.
......
......@@ -23,6 +23,7 @@ import socket
import tempfile
import time
import unittest
import shutil
# ZODB test support
import ZODB
......@@ -141,14 +142,16 @@ class GenericTests(
self._pids = [pid]
self._servers = [adminaddr]
self._conf_path = path
self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
self._storage = ClientStorage(zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1,
wait_timeout=60)
wait_timeout=60, blob_dir=self.blob_cache_dir)
self._storage.registerDB(DummyDB(), None)
def tearDown(self):
self._storage.close()
os.remove(self._conf_path)
shutil.rmtree(self.blob_cache_dir)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
......@@ -210,7 +213,6 @@ class MappingStorageTests(GenericTests):
def getConfig(self):
return """<mappingstorage 1/>"""
class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Make sure a heartbeat is being sent and that it does no harm
......@@ -395,6 +397,139 @@ test_classes = [OneTimeTests,
ConnectionInvalidationOnReconnect,
]
class BlobAdaptedFileStorageTests(GenericTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
self.filestorage = tempfile.mktemp()
super(BlobAdaptedFileStorageTests, self).setUp()
def tearDown(self):
super(BlobAdaptedFileStorageTests, self).tearDown()
shutil.rmtree(self.blobdir)
def getConfig(self):
return """
<blobstorage 1>
blob-dir %s
<filestorage 2>
path %s
</filestorage>
</blobstorage>
""" % (self.blobdir, self.filestorage)
def checkStoreBlob(self):
from ZODB.utils import oid_repr, tid_repr
from ZODB.Blobs.Blob import Blob
from ZODB.Blobs.BlobStorage import BLOB_SUFFIX
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials
import transaction
somedata = 'a' * 10
blob = Blob()
bd_fh = blob.open('w')
bd_fh.write(somedata)
bd_fh.close()
tfname = bd_fh.name
oid = self._storage.new_oid()
data = zodb_pickle(blob)
self.assert_(os.path.exists(tfname))
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
r2 = self._storage.tpc_vote(t)
revid = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
self.assert_(not os.path.exists(tfname))
filename = os.path.join(self.blobdir, oid_repr(oid),
tid_repr(revid) + BLOB_SUFFIX)
self.assert_(os.path.exists(filename))
self.assertEqual(somedata, open(filename).read())
def checkLoadBlob(self):
from ZODB.Blobs.Blob import Blob
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials
import transaction
version = ''
somedata = 'a' * 10
blob = Blob()
bd_fh = blob.open('w')
bd_fh.write(somedata)
bd_fh.close()
tfname = bd_fh.name
oid = self._storage.new_oid()
data = zodb_pickle(blob)
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
r2 = self._storage.tpc_vote(t)
serial = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
class Dummy:
def __init__(self):
self.acquired = 0
self.released = 0
def acquire(self):
self.acquired += 1
def release(self):
self.released += 1
class statusdict(dict):
def __init__(self):
self.added = []
self.removed = []
def __setitem__(self, k, v):
self.added.append(k)
super(statusdict, self).__setitem__(k, v)
def __delitem__(self, k):
self.removed.append(k)
super(statusdict, self).__delitem__(k)
# ensure that we do locking properly
filename = self._storage.fshelper.getBlobFilename(oid, serial)
thestatuslock = self._storage.blob_status_lock = Dummy()
thebloblock = Dummy()
def getBlobLock():
return thebloblock
# override getBlobLock to test that locking is performed
self._storage.getBlobLock = getBlobLock
thestatusdict = self._storage.blob_status = statusdict()
filename = self._storage.loadBlob(oid, serial, version)
self.assertEqual(thestatuslock.acquired, 2)
self.assertEqual(thestatuslock.released, 2)
self.assertEqual(thebloblock.acquired, 1)
self.assertEqual(thebloblock.released, 1)
self.assertEqual(thestatusdict.added, [(oid, serial)])
self.assertEqual(thestatusdict.removed, [(oid, serial)])
test_classes = [FileStorageTests, MappingStorageTests,
BlobAdaptedFileStorageTests]
def test_suite():
suite = unittest.TestSuite()
for klass in test_classes:
......
##############################################################################
#
# Copyright (c) 2005-2006 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""The blob class and related utilities.
"""
__docformat__ = "reStructuredText"
import os
import time
import tempfile
import logging
import zope.interface
from ZODB.Blobs.interfaces import IBlob
from ZODB.Blobs.exceptions import BlobError
from ZODB import utils
import transaction
import transaction.interfaces
from persistent import Persistent
BLOB_SUFFIX = ".blob"
class Blob(Persistent):
zope.interface.implements(IBlob)
_p_blob_readers = 0
_p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
_p_blob_data = None # Filename of the committed data
# All persistent object store a reference to their data manager, a database
# connection in the _p_jar attribute. So we are going to do the same with
# blobs here.
_p_blob_manager = None
# Blobs need to participate in transactions even when not connected to
# a database yet. If you want to use a non-default transaction manager,
# you can override it via _p_blob_transaction. This is currently
# required for unit testing.
_p_blob_transaction = None
def open(self, mode="r"):
"""Returns a file(-like) object representing blob data."""
tempdir = os.environ.get('ZODB_BLOB_TEMPDIR', tempfile.gettempdir())
result = None
if (mode.startswith("r") or mode=="U"):
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
if self._p_blob_writers != 0:
raise BlobError, "Already opened for writing."
self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self)
elif mode.startswith("w"):
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None:
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
self._p_blob_writers += 1
result = BlobFile(self._p_blob_uncommitted, mode, self)
elif mode.startswith("a"):
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None:
# Create a new working copy
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
# NOTE: _p_blob data appears by virtue of Connection._setstate
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
else:
# Re-use existing working copy
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
self._p_blob_writers += 1
result = uncommitted
else:
raise IOError, 'invalid mode: %s ' % mode
if result is not None:
# We join the transaction with our own data manager in order to be
# notified of commit/vote/abort events. We do this because at
# transaction boundaries, we need to fix up _p_ reference counts
# that keep track of open readers and writers and close any
# writable filehandles we've opened.
if self._p_blob_manager is None:
# Blobs need to always participate in transactions.
if self._p_jar is not None:
# If we are connected to a database, then we use the
# transaction manager that belongs to this connection
tm = self._p_jar.transaction_manager
else:
# If we are not connected to a database, we check whether
# we have been given an explicit transaction manager
if self._p_blob_transaction:
tm = self._p_blob_transaction
else:
# Otherwise we use the default
# transaction manager as an educated guess.
tm = transaction.manager
# Create our datamanager and join he current transaction.
dm = BlobDataManager(self, result, tm)
tm.get().join(dm)
else:
# Each blob data manager should manage only the one blob
# assigned to it. Assert that this is the case and it is the
# correct blob
assert self._p_blob_manager.blob is self
self._p_blob_manager.register_fh(result)
return result
def openDetached(self):
"""Returns a file(-like) object in read mode that can be used
outside of transaction boundaries.
"""
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
if self._p_blob_writers != 0:
raise BlobError, "Already opened for writing."
return file(self._current_filename(), "rb")
# utility methods
def _current_filename(self):
# NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
# Connection._setstate
return self._p_blob_uncommitted or self._p_blob_data
def _change(self):
self._p_changed = 1
# utility methods which should not cause the object's state to be
# loaded if they are called while the object is a ghost. Thus,
# they are named with the _p_ convention and only operate against
# other _p_ instance attributes. We conventionally name these methods
# and attributes with a _p_blob prefix.
def _p_blob_clear(self):
self._p_blob_readers = 0
self._p_blob_writers = 0
def _p_blob_decref(self, mode):
if mode.startswith('r') or mode == 'U':
self._p_blob_readers = max(0, self._p_blob_readers - 1)
elif mode.startswith('w') or mode.startswith('a'):
self._p_blob_writers = max(0, self._p_blob_writers - 1)
else:
raise AssertionError, 'Unknown mode %s' % mode
def _p_blob_refcounts(self):
# used by unit tests
return self._p_blob_readers, self._p_blob_writers
class BlobDataManager:
"""Special data manager to handle transaction boundaries for blobs.
Blobs need some special care-taking on transaction boundaries. As
a) the ghost objects might get reused, the _p_reader and _p_writer
refcount attributes must be set to a consistent state
b) the file objects might get passed out of the thread/transaction
and must deny any relationship to the original blob.
c) writable blob filehandles must be closed at the end of a txn so
as to not allow reuse between two transactions.
"""
zope.interface.implements(transaction.interfaces.IDataManager)
def __init__(self, blob, filehandle, tm):
self.blob = blob
self.transaction = tm.get()
# we keep a weakref to the file handle because we don't want to
# keep it alive if all other references to it die (e.g. in the
# case it's opened without assigning it to a name).
self.fhrefs = utils.WeakSet()
self.register_fh(filehandle)
self.sortkey = time.time()
self.prepared = False
# Blob specific methods
def register_fh(self, filehandle):
self.fhrefs.add(filehandle)
def _remove_uncommitted_data(self):
self.blob._p_blob_clear()
self.fhrefs.map(lambda fhref: fhref.close())
if self.blob._p_blob_uncommitted is not None and \
os.path.exists(self.blob._p_blob_uncommitted):
os.unlink(self.blob._p_blob_uncommitted)
self.blob._p_blob_uncommitted = None
# IDataManager
def tpc_begin(self, transaction):
if self.prepared:
raise TypeError('Already prepared')
self._checkTransaction(transaction)
self.prepared = True
self.transaction = transaction
self.fhrefs.map(lambda fhref: fhref.close())
def commit(self, transaction):
if not self.prepared:
raise TypeError('Not prepared to commit')
self._checkTransaction(transaction)
self.transaction = None
self.prepared = False
self.blob._p_blob_clear()
def abort(self, transaction):
self.tpc_abort(transaction)
def tpc_abort(self, transaction):
self._checkTransaction(transaction)
if self.transaction is not None:
self.transaction = None
self.prepared = False
self._remove_uncommitted_data()
def tpc_finish(self, transaction):
pass
def tpc_vote(self, transaction):
pass
def sortKey(self):
return self.sortkey
def _checkTransaction(self, transaction):
if (self.transaction is not None and
self.transaction is not transaction):
raise TypeError("Transaction missmatch",
transaction, self.transaction)
class BlobFile(file):
"""A BlobFile that holds a file handle to actual blob data.
It is a file that can be used within a transaction boundary; a BlobFile is
just a Python file object, we only override methods which cause a change to
blob data in order to call methods on our 'parent' persistent blob object
signifying that the change happened.
"""
# XXX these files should be created in the same partition as
# the storage later puts them to avoid copying them ...
def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode)
self.blob = blob
self.close_called = False
def write(self, data):
super(BlobFile, self).write(data)
self.blob._change()
def writelines(self, lines):
super(BlobFile, self).writelines(lines)
self.blob._change()
def truncate(self, size=0):
super(BlobFile, self).truncate(size)
self.blob._change()
def close(self):
# we don't want to decref twice
if not self.close_called:
self.blob._p_blob_decref(self.mode)
self.close_called = True
super(BlobFile, self).close()
def __del__(self):
# XXX we need to ensure that the file is closed at object
# expiration or our blob's refcount won't be decremented.
# This probably needs some work; I don't know if the names
# 'BlobFile' or 'super' will be available at program exit, but
# we'll assume they will be for now in the name of not
# muddying the code needlessly.
self.close()
logger = logging.getLogger('ZODB.Blobs')
_pid = str(os.getpid())
def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
message = "(%s) %s" % (subsys, msg)
logger.log(level, message, exc_info=exc_info)
class FilesystemHelper:
# Storages that implement IBlobStorage can choose to use this
# helper class to generate and parse blob filenames. This is not
# a set-in-stone interface for all filesystem operations dealing
# with blobs and storages needn't indirect through this if they
# want to perform blob storage differently.
def __init__(self, base_dir):
self.base_dir = base_dir
def create(self):
if not os.path.exists(self.base_dir):
os.makedirs(self.base_dir, 0700)
log("Blob cache directory '%s' does not exist. "
"Created new directory." % self.base_dir,
level=logging.INFO)
def isSecure(self, path):
"""Ensure that (POSIX) path mode bits are 0700."""
return (os.stat(path).st_mode & 077) != 0
def checkSecure(self):
if not self.isSecure(self.base_dir):
log('Blob dir %s has insecure mode setting' % self.base_dir,
level=logging.WARNING)
def getPathForOID(self, oid):
"""Given an OID, return the path on the filesystem where
the blob data relating to that OID is stored.
"""
return os.path.join(self.base_dir, utils.oid_repr(oid))
def getBlobFilename(self, oid, tid):
"""Given an oid and a tid, return the full filename of the
'committed' blob file related to that oid and tid.
"""
oid_path = self.getPathForOID(oid)
filename = "%s%s" % (utils.tid_repr(tid), BLOB_SUFFIX)
return os.path.join(oid_path, filename)
def blob_mkstemp(self, oid, tid):
"""Given an oid and a tid, return a temporary file descriptor
and a related filename.
The file is guaranteed to exist on the same partition as committed
data, which is important for being able to rename the file without a
copy operation. The directory in which the file will be placed, which
is the return value of self.getPathForOID(oid), must exist before this
method may be called successfully.
"""
oidpath = self.getPathForOID(oid)
fd, name = tempfile.mkstemp(suffix='.tmp', prefix=utils.tid_repr(tid),
dir=oidpath)
return fd, name
def splitBlobFilename(self, filename):
"""Returns the oid and tid for a given blob filename.
If the filename cannot be recognized as a blob filename, (None, None)
is returned.
"""
if not filename.endswith(BLOB_SUFFIX):
return None, None
path, filename = os.path.split(filename)
oid = os.path.split(path)[1]
serial = filename[:-len(BLOB_SUFFIX)]
oid = utils.repr_to_oid(oid)
serial = utils.repr_to_oid(serial)
return oid, serial
def getOIDsForSerial(self, search_serial):
"""Return all oids related to a particular tid that exist in
blob data.
"""
oids = []
base_dir = self.base_dir
for oidpath in os.listdir(base_dir):
for filename in os.listdir(os.path.join(base_dir, oidpath)):
blob_path = os.path.join(base_dir, oidpath, filename)
oid, serial = self.splitBlobFilename(blob_path)
if search_serial == serial:
oids.append(oid)
return oids
##############################################################################
#
# Copyright (c) 2005-2006 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A ZODB storage that provides blob capabilities.
"""
__docformat__ = "reStructuredText"
import os
import shutil
import base64
import logging
from zope.interface import implements
from zope.proxy import getProxiedObject, non_overridable
from zope.proxy.decorator import SpecificationDecoratorBase
from ZODB import utils
from ZODB.Blobs.interfaces import IBlobStorage, IBlob
from ZODB.POSException import POSKeyError
from ZODB.Blobs.Blob import BLOB_SUFFIX
from ZODB.Blobs.Blob import FilesystemHelper
logger = logging.getLogger('ZODB.BlobStorage')
class BlobStorage(SpecificationDecoratorBase):
"""A storage to support blobs."""
implements(IBlobStorage)
# Proxies can't have a __dict__ so specifying __slots__ here allows
# us to have instance attributes explicitly on the proxy.
__slots__ = ('fshelper', 'dirty_oids')
def __new__(self, base_directory, storage):
return SpecificationDecoratorBase.__new__(self, storage)
def __init__(self, base_directory, storage):
# XXX Log warning if storage is ClientStorage
SpecificationDecoratorBase.__init__(self, storage)
self.fshelper = FilesystemHelper(base_directory)
self.fshelper.create()
self.fshelper.checkSecure()
self.dirty_oids = []
@non_overridable
def __repr__(self):
normal_storage = getProxiedObject(self)
return '<BlobStorage proxy for %r at %s>' % (normal_storage,
hex(id(self)))
@non_overridable
def storeBlob(self, oid, oldserial, data, blobfilename, version,
transaction):
"""Stores data that has a BLOB attached."""
serial = self.store(oid, oldserial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
# the user may not have called "open" on the blob object,
# in which case, the blob will not have a filename.
if blobfilename is not None:
self._lock_acquire()
try:
targetpath = self.fshelper.getPathForOID(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self.fshelper.getBlobFilename(oid, serial)
os.rename(blobfilename, targetname)
# XXX if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally:
self._lock_release()
return self._tid
@non_overridable
def tpc_finish(self, *arg, **kw):
# We need to override the base storage's tpc_finish instead of
# providing a _finish method because methods found on the proxied
# object aren't rebound to the proxy
getProxiedObject(self).tpc_finish(*arg, **kw)
self.dirty_oids = []
@non_overridable
def tpc_abort(self, *arg, **kw):
# We need to override the base storage's abort instead of
# providing an _abort method because methods found on the proxied object
# aren't rebound to the proxy
getProxiedObject(self).tpc_abort(*arg, **kw)
while self.dirty_oids:
oid, serial = self.dirty_oids.pop()
clean = self.fshelper.getBlobFilename(oid, serial)
if os.exists(clean):
os.unlink(clean)
@non_overridable
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
filename = self.fshelper.getBlobFilename(oid, serial)
if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob."
return filename
@non_overridable
def _packUndoing(self, packtime, referencesf):
# Walk over all existing revisions of all blob files and check
# if they are still needed by attempting to load the revision
# of that object from the database. This is maybe the slowest
# possible way to do this, but it's safe.
# XXX we should be tolerant of "garbage" directories/files in
# the base_directory here.
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
files = os.listdir(oid_path)
files.sort()
for filename in files:
filepath = os.path.join(oid_path, filename)
whatever, serial = self.fshelper.splitBlobFilename(filepath)
try:
fn = self.fshelper.getBlobFilename(oid, serial)
self.loadSerial(oid, serial)
except POSKeyError:
os.unlink(filepath)
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
@non_overridable
def _packNonUndoing(self, packtime, referencesf):
base_dir = self.fshelper.base_dir
for oid_repr in os.listdir(base_dir):
oid = utils.repr_to_oid(oid_repr)
oid_path = os.path.join(base_dir, oid_repr)
exists = True
try:
self.load(oid, None) # no version support
except (POSKeyError, KeyError):
exists = False
if exists:
files = os.listdir(oid_path)
files.sort()
latest = files[-1] # depends on ever-increasing tids
files.remove(latest)
for file in files:
os.unlink(os.path.join(oid_path, file))
else:
shutil.rmtree(oid_path)
continue
if not os.listdir(oid_path):
shutil.rmtree(oid_path)
@non_overridable
def pack(self, packtime, referencesf):
"""Remove all unused oid/tid combinations."""
unproxied = getProxiedObject(self)
# pack the underlying storage, which will allow us to determine
# which serials are current.
result = unproxied.pack(packtime, referencesf)
# perform a pack on blob data
self._lock_acquire()
try:
if unproxied.supportsUndo():
self._packUndoing(packtime, referencesf)
else:
self._packNonUndoing(packtime, referencesf)
finally:
self._lock_release()
return result
@non_overridable
def getSize(self):
"""Return the size of the database in bytes."""
orig_size = getProxiedObject(self).getSize()
blob_size = 0
base_dir = self.fshelper.base_dir
for oid in os.listdir(base_dir):
for serial in os.listdir(os.path.join(base_dir, oid)):
if not serial.endswith(BLOB_SUFFIX):
continue
file_path = os.path.join(base_dir, oid, serial)
blob_size += os.stat(file_path).st_size
return orig_size + blob_size
@non_overridable
def undo(self, serial_id, transaction):
undo_serial, keys = getProxiedObject(self).undo(serial_id, transaction)
# serial_id is the transaction id of the txn that we wish to undo.
# "undo_serial" is the transaction id of txn in which the undo is
# performed. "keys" is the list of oids that are involved in the
# undo transaction.
# The serial_id is assumed to be given to us base-64 encoded
# (belying the web UI legacy of the ZODB code :-()
serial_id = base64.decodestring(serial_id+'\n')
self._lock_acquire()
try:
# we get all the blob oids on the filesystem related to the
# transaction we want to undo.
for oid in self.fshelper.getOIDsForSerial(serial_id):
# we want to find the serial id of the previous revision
# of this blob object.
load_result = self.loadBefore(oid, serial_id)
if load_result is None:
# There was no previous revision of this blob
# object. The blob was created in the transaction
# represented by serial_id. We copy the blob data
# to a new file that references the undo
# transaction in case a user wishes to undo this
# undo.
orig_fn = self.fshelper.getBlobFilename(oid, serial_id)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
else:
# A previous revision of this blob existed before the
# transaction implied by "serial_id". We copy the blob
# data to a new file that references the undo transaction
# in case a user wishes to undo this undo.
data, serial_before, serial_after = load_result
orig_fn = self.fshelper.getBlobFilename(oid, serial_before)
new_fn = self.fshelper.getBlobFilename(oid, undo_serial)
orig = open(orig_fn, "r")
new = open(new_fn, "wb")
utils.cp(orig, new)
orig.close()
new.close()
self.dirty_oids.append((oid, undo_serial))
finally:
self._lock_release()
return undo_serial, keys
Production
- Ensure we detect and replay a failed txn involving blobs forward or
backward at startup.
- Check for windows compatibility
- Check cache-compatibility with shared network filesystems (mcdonc)
Far future
More options for blob directory structures (e.g. dirstorages
bushy/chunky/lawn/flat).
Make the ClientStorage support minimizing the blob cache. (Idea: LRU
principle via mstat access time and a size-based threshold) currently).
Make blobs able to efficiently consume existing files from the filesystem
Savepoint support
=================
- A savepoint represents the whole state of the data at a certain point in
time
- Need special storage for blob savepointing (in the spirit of tmpstorage)
- What belongs to the state of the data?
- Data contained in files at that point in time
- File handles are complex because they might be referred to from various
places. We would have to introduce an abstraction layer to allow
switching them around...
Simpler solution: :
Goal: Handle storage and retrieval of binary large objects efficiently,
transactionally, and transparently.
Measure:
- Don't block ZServer on uploads and downloads
- Don't hold BLOBS in memory or cache if not necessary (LRU caches tend
to break if we split BLOBs in lot of small objects. Size-based caches
tend to break on single large objects)
- Transparent for other systems, support normal ZODB operations.
Comments:
- Cache: BLOBs could be cached in a seperate "BLOB" space, e.g. in
single files
- Be storage independent?
- Memory efficiency: Storge.load() currently holds all data of an
object in a string.
Steps:
- simple aspects:
- blobs should be known by zodb
- storages, esp. clientstorage must be able to recognize blobs
- to avoid putting blob data into the client cache.
- blob data mustn't end up in the object cache
- blob object and blob data need to be handled separately
- blob data on client is stored in temporary files
- complicated aspects
- temporary files holding blob data could server as a
separated cache for blob data
- storage / zodb api change
Restrictions:
- a particular BLOB instance can't be open for read _and_ write at
the same time
- Allowed: N readers, no writers; 1 writer, no readers
- Reason:
- a writable filehandle opened via a BLOB's 'open' method has a
lifetime tied to the transaction in which the 'open' method was
called. We do this in order to prevent changes to blob data
from "bleeding over" between transactions.
- Data has been committed? -> File(name) for commited data available
- .open("r") on fresh loaded blob returns committed data
- first .open("w") -> new empty file for uncommitted data
- .open("a") or .open("r+"), we copy existing data into file for
uncommitted data
- if uncommitted data exists, subsequent .open("*") will use the
uncommitted data
- if opened for writing, the object is marked as changed
(optimiziation possible)
- connections want to recognize blobs on transaction boundaries
class BlobError(Exception):
pass
from zope.interface import Interface
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
def open(mode):
"""Returns a file(-like) object for handling the blob data.
mode: Mode to open the file with. Possible values: r,w,r+,a
"""
def openDetached():
"""Returns a file(-like) object in read mode that can be used
outside of transaction boundaries.
The file handle returned by this method is read-only and at the
beginning of the file.
The handle is not attached to the blob and can be used outside of a
transaction.
"""
# XXX need a method to initialize the blob from the storage
# this means a) setting the _p_blob_data filename and b) putting
# the current data in that file
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, oldserial, data, blob, version, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial, version):
"""Return the filename of the Blob data responding to this OID and
serial.
Returns a filename or None if no Blob data is connected with this OID.
Raises POSKeyError if the blobfile cannot be found.
"""
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
ZODB Blob support
=================
You create a blob like this:
>>> from ZODB.Blobs.Blob import Blob
>>> myblob = Blob()
A blob implements the IBlob interface:
>>> from ZODB.Blobs.interfaces import IBlob
>>> IBlob.providedBy(myblob)
True
Opening a new Blob for reading fails:
>>> myblob.open("r")
Traceback (most recent call last):
...
BlobError: Blob does not exist.
But we can write data to a new Blob by opening it for writing:
>>> f = myblob.open("w")
>>> f.write("Hi, Blob!")
If we try to open a Blob again while it is open for writing, we get an error:
>>> myblob.open("r")
Traceback (most recent call last):
...
BlobError: Already opened for writing.
We can close the file:
>>> f.close()
Now we can open it for reading:
>>> f2 = myblob.open("r")
And we get the data back:
>>> f2.read()
'Hi, Blob!'
If we want to, we can open it again:
>>> f3 = myblob.open("r")
>>> f3.read()
'Hi, Blob!'
But we can't open it for writing, while it is opened for reading:
>>> myblob.open("a")
Traceback (most recent call last):
...
BlobError: Already opened for reading.
Before we can write, we have to close the readers:
>>> f2.close()
>>> f3.close()
Now we can open it for writing again and e.g. append data:
>>> f4 = myblob.open("a")
>>> f4.write("\nBlob is fine.")
>>> f4.close()
Now we can read it:
>>> f4a = myblob.open("r")
>>> f4a.read()
'Hi, Blob!\nBlob is fine.'
>>> f4a.close()
You shouldn't need to explicitly close a blob unless you hold a reference
to it via a name. If the first line in the following test kept a reference
around via a name, the second call to open it in a writable mode would fail
with a BlobError, but it doesn't.
>>> myblob.open("r+").read()
'Hi, Blob!\nBlob is fine.'
>>> f4b = myblob.open("a")
>>> f4b.close()
We can read lines out of the blob too:
>>> f5 = myblob.open("r")
>>> f5.readline()
'Hi, Blob!\n'
>>> f5.readline()
'Blob is fine.'
>>> f5.close()
We can seek to certain positions in a blob and read portions of it:
>>> f6 = myblob.open('r')
>>> f6.seek(4)
>>> int(f6.tell())
4
>>> f6.read(5)
'Blob!'
>>> f6.close()
We can use the object returned by a blob open call as an iterable:
>>> f7 = myblob.open('r')
>>> for line in f7:
... print line
Hi, Blob!
<BLANKLINE>
Blob is fine.
>>> f7.close()
We can truncate a blob:
>>> f8 = myblob.open('a')
>>> f8.truncate(0)
>>> f8.close()
>>> f8 = myblob.open('r')
>>> f8.read()
''
>>> f8.close()
We can explicitly open Blobs in the different modified modes:
>>> f9 = myblob.open("rb")
>>> f9.mode
'rb'
>>> f9.close()
We can specify the tempdir that blobs use to keep uncommitted data by
modifying the ZODB_BLOB_TEMPDIR environment variable:
>>> import os, tempfile, shutil
>>> tempdir = tempfile.mkdtemp()
>>> os.environ['ZODB_BLOB_TEMPDIR'] = tempdir
>>> myblob = Blob()
>>> len(os.listdir(tempdir))
0
>>> f = myblob.open('w')
>>> len(os.listdir(tempdir))
1
>>> f.close()
>>> shutil.rmtree(tempdir)
>>> del os.environ['ZODB_BLOB_TEMPDIR']
Some cleanup in this test is needed:
>>> import transaction
>>> transaction.get().abort()
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Connection support for Blobs tests
==================================
Connections handle Blobs specially. To demonstrate that, we first need a Blob with some data:
>>> from ZODB.Blobs.interfaces import IBlob
>>> from ZODB.Blobs.Blob import Blob
>>> import transaction
>>> blob = Blob()
>>> data = blob.open("w")
>>> data.write("I'm a happy Blob.")
>>> data.close()
We also need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.DB import DB
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Putting a Blob into a Connection works like every other object:
>>> connection = database.open()
>>> root = connection.root()
>>> root['myblob'] = blob
>>> transaction.commit()
We can also commit a transaction that seats a blob into place without
calling the blob's open method (this currently fails):
>>> nothing = transaction.begin()
>>> anotherblob = Blob()
>>> root['anotherblob'] = anotherblob
>>> nothing = transaction.commit()
Getting stuff out of there works similar:
>>> connection2 = database.open()
>>> root = connection2.root()
>>> blob2 = root['myblob']
>>> IBlob.providedBy(blob2)
True
>>> blob2.open("r").read()
"I'm a happy Blob."
You can't put blobs into a database that has uses a Non-Blob-Storage, though:
>>> no_blob_storage = MappingStorage()
>>> database2 = DB(no_blob_storage)
>>> connection3 = database2.open()
>>> root = connection3.root()
>>> root['myblob'] = Blob()
>>> transaction.commit() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage instance at ...> is not supported.
While we are testing this, we don't need the storage directory and
databases anymore:
>>> import shutil
>>> shutil.rmtree(blob_dir)
>>> transaction.abort()
>>> database.close()
>>> database2.close()
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Import/export support for blob data
===================================
Set up:
>>> from ZODB.FileStorage import FileStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.Blobs.Blob import Blob
>>> from ZODB.DB import DB
>>> from persistent.mapping import PersistentMapping
>>> import shutil
>>> import transaction
>>> from tempfile import mkdtemp, mktemp
>>> storagefile1 = mktemp()
>>> blob_dir1 = mkdtemp()
>>> storagefile2 = mktemp()
>>> blob_dir2 = mkdtemp()
We need an database with an undoing blob supporting storage:
>>> base_storage1 = FileStorage(storagefile1)
>>> blob_storage1 = BlobStorage(blob_dir1, base_storage1)
>>> base_storage2 = FileStorage(storagefile2)
>>> blob_storage2 = BlobStorage(blob_dir2, base_storage2)
>>> database1 = DB(blob_storage1)
>>> database2 = DB(blob_storage2)
Create our root object for database1:
>>> connection1 = database1.open()
>>> root1 = connection1.root()
Put a couple blob objects in our database1 and on the filesystem:
>>> import time, os
>>> nothing = transaction.begin()
>>> tid = blob_storage1._tid
>>> data1 = 'x'*100000
>>> blob1 = Blob()
>>> blob1.open('w').write(data1)
>>> data2 = 'y'*100000
>>> blob2 = Blob()
>>> blob2.open('w').write(data2)
>>> d = PersistentMapping({'blob1':blob1, 'blob2':blob2})
>>> root1['blobdata'] = d
>>> transaction.commit()
Export our blobs from a database1 connection:
>>> conn = root1['blobdata']._p_jar
>>> oid = root1['blobdata']._p_oid
>>> exportfile = mktemp()
>>> nothing = connection1.exportFile(oid, exportfile)
Import our exported data into database2:
>>> connection2 = database2.open()
>>> root2 = connection2.root()
>>> nothing = transaction.begin()
>>> data = root2._p_jar.importFile(exportfile)
>>> root2['blobdata'] = data
>>> transaction.commit()
Make sure our data exists:
>>> items1 = root1['blobdata']
>>> items2 = root2['blobdata']
>>> bool(items1.keys() == items2.keys())
True
>>> items1['blob1'].open().read() == items2['blob1'].open().read()
True
>>> items1['blob2'].open().read() == items2['blob2'].open().read()
True
>>> transaction.get().abort()
Clean up our blob directory:
>>> base_storage1.close()
>>> base_storage2.close()
>>> shutil.rmtree(blob_dir1)
>>> shutil.rmtree(blob_dir2)
>>> os.unlink(exportfile)
>>> os.unlink(storagefile1)
>>> os.unlink(storagefile1+".index")
>>> os.unlink(storagefile1+".tmp")
>>> os.unlink(storagefile2)
>>> os.unlink(storagefile2+".index")
>>> os.unlink(storagefile2+".tmp")
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Packing support for blob data
=============================
Set up:
>>> from ZODB.FileStorage import FileStorage
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.serialize import referencesf
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.Blobs.Blob import Blob
>>> from ZODB import utils
>>> from ZODB.DB import DB
>>> import shutil
>>> import transaction
>>> from tempfile import mkdtemp, mktemp
>>> storagefile = mktemp()
>>> blob_dir = mkdtemp()
A helper method to assure a unique timestamp across multiple platforms. This
method also makes sure that after retrieving a timestamp that was *before* a
transaction was committed, that at least one second passes so the packing time
actually is before the commit time.
>>> import time
>>> def new_time():
... now = new_time = time.time()
... while new_time <= now:
... new_time = time.time()
... time.sleep(1)
... return new_time
UNDOING
=======
We need a database with an undoing blob supporting storage:
>>> base_storage = FileStorage(storagefile)
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Create our root object:
>>> connection1 = database.open()
>>> root = connection1.root()
Put some revisions of a blob object in our database and on the filesystem:
>>> import os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> blob = Blob()
>>> blob.open('w').write('this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 4')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Do a pack to the slightly before the first revision was written:
>>> packtime = times[0]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Do a pack to the slightly before the second revision was written:
>>> packtime = times[1]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Do a pack to the slightly before the third revision was written:
>>> packtime = times[2]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, True, True, True, True]
Do a pack to the slightly before the fourth revision was written:
>>> packtime = times[3]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, True, True, True]
Do a pack to the slightly before the fifth revision was written:
>>> packtime = times[4]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, True, True]
Do a pack to now:
>>> packtime = new_time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, True]
Delete the object and do a pack, it should get rid of the most current
revision as well as the entire directory:
>>> nothing = transaction.begin()
>>> del root['blob']
>>> transaction.commit()
>>> packtime = new_time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, False]
>>> os.path.exists(os.path.split(fns[0])[0])
False
Clean up our blob directory and database:
>>> shutil.rmtree(blob_dir)
>>> base_storage.close()
>>> os.unlink(storagefile)
>>> os.unlink(storagefile+".index")
>>> os.unlink(storagefile+".tmp")
>>> os.unlink(storagefile+".old")
NON-UNDOING
===========
We need an database with a NON-undoing blob supporting storage:
>>> base_storage = MappingStorage('storage')
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Create our root object:
>>> connection1 = database.open()
>>> root = connection1.root()
Put some revisions of a blob object in our database and on the filesystem:
>>> import time, os
>>> tids = []
>>> times = []
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> blob = Blob()
>>> blob.open('w').write('this is blob data 0')
>>> root['blob'] = blob
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 1')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 2')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 3')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> nothing = transaction.begin()
>>> times.append(new_time())
>>> root['blob'].open('w').write('this is blob data 4')
>>> transaction.commit()
>>> tids.append(blob_storage._tid)
>>> oid = root['blob']._p_oid
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
>>> [ os.path.exists(x) for x in fns ]
[True, True, True, True, True]
Get our blob filenames for this oid.
>>> fns = [ blob_storage.fshelper.getBlobFilename(oid, x) for x in tids ]
Do a pack to the slightly before the first revision was written:
>>> packtime = times[0]
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, True]
Do a pack to now:
>>> packtime = new_time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, True]
Delete the object and do a pack, it should get rid of the most current
revision as well as the entire directory:
>>> nothing = transaction.begin()
>>> del root['blob']
>>> transaction.commit()
>>> packtime = new_time()
>>> blob_storage.pack(packtime, referencesf)
>>> [ os.path.exists(x) for x in fns ]
[False, False, False, False, False]
>>> os.path.exists(os.path.split(fns[0])[0])
False
Clean up our blob directory:
>>> shutil.rmtree(blob_dir)
##############################################################################
#
# Copyright (c) 2004-2006 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import tempfile, shutil, unittest
import os
from ZODB.tests.testConfig import ConfigTestBase
from ZConfig import ConfigurationSyntaxError
class BlobConfigTestBase(ConfigTestBase):
def setUp(self):
super(BlobConfigTestBase, self).setUp()
self.blob_dir = tempfile.mkdtemp()
def tearDown(self):
super(BlobConfigTestBase, self).tearDown()
shutil.rmtree(self.blob_dir)
class ZODBBlobConfigTest(BlobConfigTestBase):
def test_map_config1(self):
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<mappingstorage/>
</blobstorage>
</zodb>
""" % self.blob_dir)
def test_file_config1(self):
path = tempfile.mktemp()
self._test(
"""
<zodb>
<blobstorage>
blob-dir %s
<filestorage>
path %s
</filestorage>
</blobstorage>
</zodb>
""" %(self.blob_dir, path))
os.unlink(path)
os.unlink(path+".index")
os.unlink(path+".tmp")
def test_blob_dir_needed(self):
self.assertRaises(ConfigurationSyntaxError,
self._test,
"""
<zodb>
<blobstorage>
<mappingstorage/>
</blobstorage>
</zodb>
""")
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ZODBBlobConfigTest))
return suite
if __name__ == '__main__':
unittest.main(defaultTest = 'test_suite')
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.testing.doctestunit import DocFileSuite
def test_suite():
return DocFileSuite("basic.txt", "connection.txt", "transaction.txt",
"packing.txt", "importexport.txt")
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import unittest
import tempfile
import os
import shutil
import base64
from ZODB.FileStorage import FileStorage
from ZODB.Blobs.BlobStorage import BlobStorage
from ZODB.Blobs.Blob import Blob
from ZODB.DB import DB
import transaction
from ZODB.Blobs.Blob import Blob
from ZODB import utils
class BlobUndoTests(unittest.TestCase):
def setUp(self):
self.storagefile = tempfile.mktemp()
self.blob_dir = tempfile.mkdtemp()
def tearDown(self):
try:
os.unlink(self.storagefile)
except (OSError, IOError):
pass
shutil.rmtree(self.blob_dir)
def testUndoWithoutPreviousVersion(self):
base_storage = FileStorage(self.storagefile)
blob_storage = BlobStorage(self.blob_dir, base_storage)
database = DB(blob_storage)
connection = database.open()
root = connection.root()
transaction.begin()
root['blob'] = Blob()
transaction.commit()
serial = base64.encodestring(blob_storage._tid)
# undo the creation of the previously added blob
transaction.begin()
database.undo(serial, blob_storage._transaction)
transaction.commit()
connection.close()
connection = database.open()
root = connection.root()
# the blob footprint object should exist no longer
self.assertRaises(KeyError, root.__getitem__, 'blob')
def testUndo(self):
base_storage = FileStorage(self.storagefile)
blob_storage = BlobStorage(self.blob_dir, base_storage)
database = DB(blob_storage)
connection = database.open()
root = connection.root()
transaction.begin()
blob = Blob()
blob.open('w').write('this is state 1')
root['blob'] = blob
transaction.commit()
transaction.begin()
blob = root['blob']
blob.open('w').write('this is state 2')
transaction.commit()
transaction.begin()
blob = root['blob']
self.assertEqual(blob.open('r').read(), 'this is state 2')
transaction.abort()
serial = base64.encodestring(blob_storage._tid)
transaction.begin()
blob_storage.undo(serial, blob_storage._transaction)
transaction.commit()
transaction.begin()
blob = root['blob']
self.assertEqual(blob.open('r').read(), 'this is state 1')
transaction.abort()
def testRedo(self):
base_storage = FileStorage(self.storagefile)
blob_storage = BlobStorage(self.blob_dir, base_storage)
database = DB(blob_storage)
connection = database.open()
root = connection.root()
blob = Blob()
transaction.begin()
blob.open('w').write('this is state 1')
root['blob'] = blob
transaction.commit()
transaction.begin()
blob = root['blob']
blob.open('w').write('this is state 2')
transaction.commit()
serial = base64.encodestring(blob_storage._tid)
transaction.begin()
database.undo(serial)
transaction.commit()
transaction.begin()
blob = root['blob']
self.assertEqual(blob.open('r').read(), 'this is state 1')
transaction.abort()
serial = base64.encodestring(blob_storage._tid)
transaction.begin()
database.undo(serial)
transaction.commit()
transaction.begin()
blob = root['blob']
self.assertEqual(blob.open('r').read(), 'this is state 2')
transaction.abort()
def testRedoOfCreation(self):
base_storage = FileStorage(self.storagefile)
blob_storage = BlobStorage(self.blob_dir, base_storage)
database = DB(blob_storage)
connection = database.open()
root = connection.root()
blob = Blob()
transaction.begin()
blob.open('w').write('this is state 1')
root['blob'] = blob
transaction.commit()
serial = base64.encodestring(blob_storage._tid)
transaction.begin()
database.undo(serial)
transaction.commit()
self.assertRaises(KeyError, root.__getitem__, 'blob')
serial = base64.encodestring(blob_storage._tid)
transaction.begin()
database.undo(serial)
transaction.commit()
transaction.begin()
blob = root['blob']
self.assertEqual(blob.open('r').read(), 'this is state 1')
transaction.abort()
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(BlobUndoTests))
return suite
if __name__ == '__main__':
unittest.main(defaultTest = 'test_suite')
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Transaction support for Blobs
=============================
We need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.DB import DB
>>> import transaction
>>> import tempfile
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
>>> connection1 = database.open()
>>> root1 = connection1.root()
>>> from ZODB.Blobs.Blob import Blob
Putting a Blob into a Connection works like any other Persistent object:
>>> blob1 = Blob()
>>> blob1.open('w').write('this is blob 1')
>>> root1['blob1'] = blob1
>>> transaction.commit()
Aborting a transaction involving a blob write cleans up uncommitted
file data:
>>> dead_blob = Blob()
>>> dead_blob.open('w').write('this is a dead blob')
>>> root1['dead_blob'] = dead_blob
>>> fname = dead_blob._p_blob_uncommitted
>>> import os
>>> os.path.exists(fname)
True
>>> transaction.abort()
>>> os.path.exists(fname)
False
Opening a blob gives us a filehandle. Getting data out of the
resulting filehandle is accomplished via the filehandle's read method:
>>> connection2 = database.open()
>>> root2 = connection2.root()
>>> blob1a = root2['blob1']
>>> blob1a._p_blob_refcounts()
(0, 0)
>>>
>>> blob1afh1 = blob1a.open("r")
>>> blob1afh1.read()
'this is blob 1'
>>> # The filehandle keeps a reference to its blob object
>>> blob1afh1.blob._p_blob_refcounts()
(1, 0)
Let's make another filehandle for read only to blob1a, this should bump
up its refcount by one, and each file handle has a reference to the
(same) underlying blob:
>>> blob1afh2 = blob1a.open("r")
>>> blob1afh2.blob._p_blob_refcounts()
(2, 0)
>>> blob1afh1.blob._p_blob_refcounts()
(2, 0)
>>> blob1afh2.blob is blob1afh1.blob
True
Let's close the first filehandle we got from the blob, this should decrease
its refcount by one:
>>> blob1afh1.close()
>>> blob1a._p_blob_refcounts()
(1, 0)
Let's abort this transaction, and ensure that the filehandles that we
opened are now closed and that the filehandle refcounts on the blob
object are cleared.
>>> transaction.abort()
>>> blob1afh1.blob._p_blob_refcounts()
(0, 0)
>>> blob1afh2.blob._p_blob_refcounts()
(0, 0)
>>> blob1a._p_blob_refcounts()
(0, 0)
>>> blob1afh2.read()
Traceback (most recent call last):
...
ValueError: I/O operation on closed file
If we open a blob for append, its write refcount should be nonzero.
Additionally, writing any number of bytes to the blobfile should
result in the blob being marked "dirty" in the connection (we just
aborted above, so the object should be "clean" when we start):
>>> bool(blob1a._p_changed)
False
>>> blob1a.open('r').read()
'this is blob 1'
>>> blob1afh3 = blob1a.open('a')
>>> blob1afh3.write('woot!')
>>> blob1a._p_blob_refcounts()
(0, 1)
>>> bool(blob1a._p_changed)
True
We can open more than one blob object during the course of a single
transaction:
>>> blob2 = Blob()
>>> blob2.open('w').write('this is blob 3')
>>> root2['blob2'] = blob2
>>> transaction.commit()
>>> blob2._p_blob_refcounts()
(0, 0)
>>> blob1._p_blob_refcounts()
(0, 0)
Since we committed the current transaction above, the aggregate
changes we've made to blob, blob1a (these refer to the same object) and
blob2 (a different object) should be evident:
>>> blob1.open('r').read()
'this is blob 1woot!'
>>> blob1a.open('r').read()
'this is blob 1woot!'
>>> blob2.open('r').read()
'this is blob 3'
We shouldn't be able to persist a blob filehandle at commit time
(although the exception which is raised when an object cannot be
pickled appears to be particulary unhelpful for casual users at the
moment):
>>> root1['wontwork'] = blob1.open('r')
>>> transaction.commit()
Traceback (most recent call last):
...
TypeError: coercing to Unicode: need string or buffer, BlobFile found
Abort for good measure:
>>> transaction.abort()
Attempting to change a blob simultaneously from two different
connections should result in a write conflict error.
>>> tm1 = transaction.TransactionManager()
>>> tm2 = transaction.TransactionManager()
>>> root3 = database.open(transaction_manager=tm1).root()
>>> root4 = database.open(transaction_manager=tm2).root()
>>> blob1c3 = root3['blob1']
>>> blob1c4 = root4['blob1']
>>> blob1c3fh1 = blob1c3.open('a')
>>> blob1c4fh1 = blob1c4.open('a')
>>> blob1c3fh1.write('this is from connection 3')
>>> blob1c4fh1.write('this is from connection 4')
>>> tm1.get().commit()
>>> root3['blob1'].open('r').read()
'this is blob 1woot!this is from connection 3'
>>> tm2.get().commit()
Traceback (most recent call last):
...
ConflictError: database conflict error (oid 0x01, class ZODB.Blobs.Blob.Blob)
After the conflict, the winning transaction's result is visible on both
connections:
>>> root3['blob1'].open('r').read()
'this is blob 1woot!this is from connection 3'
>>> tm2.get().abort()
>>> root4['blob1'].open('r').read()
'this is blob 1woot!this is from connection 3'
BlobStorages implementation of getSize() includes the blob data and adds it to
the underlying storages result of getSize():
>>> underlying_size = base_storage.getSize()
>>> blob_size = blob_storage.getSize()
>>> blob_size - underlying_size
91L
Savepoints and Blobs
--------------------
We do support optimistic savepoints :
>>> connection5 = database.open()
>>> root5 = connection5.root()
>>> blob = Blob()
>>> blob_fh = blob.open("wb")
>>> blob_fh.write("I'm a happy blob.")
>>> blob_fh.close()
>>> root5['blob'] = blob
>>> transaction.commit()
>>> root5['blob'].open("rb").read()
"I'm a happy blob."
>>> blob_fh = root5['blob'].open("a")
>>> blob_fh.write(" And I'm singing.")
>>> blob_fh.close()
>>> root5['blob'].open("rb").read()
"I'm a happy blob. And I'm singing."
>>> savepoint = transaction.savepoint(optimistic=True)
>>> root5['blob'].open("rb").read()
"I'm a happy blob. And I'm singing."
>>> transaction.get().commit()
We do not support non-optimistic savepoints:
>>> blob_fh = root5['blob'].open("a")
>>> blob_fh.write(" And the weather is beautiful.")
>>> blob_fh.close()
>>> root5['blob'].open("rb").read()
"I'm a happy blob. And I'm singing. And the weather is beautiful."
>>> savepoint = transaction.savepoint() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TypeError: ('Savepoints unsupported', <ZODB.Blobs.Blob.BlobDataManager instance at 0x...>)
>>> transaction.abort()
Reading Blobs outside of a transaction
--------------------------------------
If you want to read from a Blob outside of transaction boundaries (e.g. to
stream a file to the browser), you can use the openDetached() method:
>>> connection6 = database.open()
>>> root6 = connection6.root()
>>> blob = Blob()
>>> blob_fh = blob.open("wb")
>>> blob_fh.write("I'm a happy blob.")
>>> blob_fh.close()
>>> root6['blob'] = blob
>>> transaction.commit()
>>> blob.openDetached().read()
"I'm a happy blob."
Of course, that doesn't work for empty blobs
>>> blob = Blob()
>>> blob.openDetached()
Traceback (most recent call last):
...
BlobError: Blob does not exist.
nor when the Blob is already opened for writing:
>>> blob = Blob()
>>> blob_fh = blob.open("wb")
>>> blob.openDetached()
Traceback (most recent call last):
...
BlobError: Already opened for writing.
It does work when the transaction was aborted, though:
>>> blob = Blob()
>>> blob_fh = blob.open("wb")
>>> blob_fh.write("I'm a happy blob.")
>>> blob_fh.close()
>>> root6['blob'] = blob
>>> transaction.commit()
>>> blob_fh = blob.open("wb")
>>> blob_fh.write("And I'm singing.")
>>> blob_fh.close()
>>> transaction.abort()
>>> blob.openDetached().read()
"I'm a happy blob."
Teardown
--------
We don't need the storage directory and databases anymore:
>>> import shutil
>>> shutil.rmtree(blob_dir)
>>> tm1.get().abort()
>>> tm2.get().abort()
>>> database.close()
......@@ -20,6 +20,8 @@ import sys
import tempfile
import threading
import warnings
import os
import shutil
from time import time
from persistent import PickleCache
......@@ -27,6 +29,8 @@ from persistent import PickleCache
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.Blobs.interfaces import IBlob, IBlobStorage
from ZODB.Blobs.BlobStorage import BlobStorage
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
......@@ -39,8 +43,11 @@ from ZODB.ExportImport import ExportImport
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.POSException import Unsupported
from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
from ZODB import utils
global_reset_counter = 0
......@@ -591,7 +598,29 @@ class Connection(ExportImport, object):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction)
# This is a workaround to calling IBlob.proivdedBy(obj). Calling
# Interface.providedBy on a object to be stored can invertible
# set the '__providedBy__' and '__implemented__' attributes on the
# object. This interferes the storing of the object by requesting
# that the values of these objects should be stored with the ZODB.
providedBy = getattr(obj, '__providedBy__', None)
if providedBy is not None and IBlob in providedBy:
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Storing Blobs in %s is not supported." %
repr(self._storage))
s = self._storage.storeBlob(oid, serial, p,
obj._p_blob_uncommitted,
self._version, transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, self._version,
transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
......@@ -830,6 +859,13 @@ class Connection(ExportImport, object):
self._reader.setGhostState(obj, p)
obj._p_serial = serial
# Blob support
providedBy = getattr(obj, '__providedBy__', None)
if providedBy is not None and IBlob in providedBy:
obj._p_blob_uncommitted = None
obj._p_blob_data = \
self._storage.loadBlob(obj._p_oid, serial, self._version)
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
......@@ -1049,8 +1085,9 @@ class Connection(ExportImport, object):
def savepoint(self):
if self._savepoint_storage is None:
self._savepoint_storage = TmpStore(self._version,
self._normal_storage)
# XXX what to do about IBlobStorages?
tmpstore = TmpStore(self._version, self._normal_storage)
self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage
self._creating.clear()
......@@ -1082,7 +1119,7 @@ class Connection(ExportImport, object):
self._storage = self._normal_storage
self._savepoint_storage = None
self._log.debug("Commiting savepoints of size %s", src.getSize())
self._log.debug("Committing savepoints of size %s", src.getSize())
oids = src.index.keys()
# Copy invalidating and creating info from temporary storage:
......@@ -1091,10 +1128,20 @@ class Connection(ExportImport, object):
for oid in oids:
data, serial = src.load(oid, src)
try:
blobfilename = src.loadBlob(oid, serial, self._version)
except POSKeyError:
s = self._storage.store(oid, serial, data,
self._version, transaction)
else:
s = self._storage.storeBlob(oid, serial, data, blobfilename,
self._version, transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
self.invalidate(s, {oid:True})
self._handle_serial(s, oid, change=False)
src.close()
def _abort_savepoint(self):
......@@ -1137,9 +1184,14 @@ class Savepoint:
def rollback(self):
self.datamanager._rollback(self.state)
BLOB_SUFFIX = ".blob"
BLOB_DIRTY = "store"
class TmpStore:
"""A storage-like thing to support savepoints."""
implements(IBlobStorage)
def __init__(self, base_version, storage):
self._storage = storage
for method in (
......@@ -1149,6 +1201,10 @@ class TmpStore:
setattr(self, method, getattr(storage, method))
self._base_version = base_version
tmpdir = os.environ.get('ZODB_BLOB_TEMPDIR')
if tmpdir is None:
tmpdir = tempfile.mkdtemp()
self._blobdir = tmpdir
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
......@@ -1162,6 +1218,7 @@ class TmpStore:
def close(self):
self._file.close()
shutil.rmtree(self._blobdir)
def load(self, oid, version):
pos = self.index.get(oid)
......@@ -1193,6 +1250,37 @@ class TmpStore:
self.position += l + len(header)
return serial
def storeBlob(self, oid, serial, data, blobfilename, version,
transaction):
serial = self.store(oid, serial, data, version, transaction)
assert isinstance(serial, str) # XXX in theory serials could be
# something else
targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self._getCleanFilename(oid, serial)
os.rename(blobfilename, targetname)
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
"""
filename = self._getCleanFilename(oid, serial)
if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob."
return filename
def _getBlobPath(self, oid):
return os.path.join(self._blobdir,
utils.oid_repr(oid)
)
def _getCleanFilename(self, oid, tid):
return os.path.join(self._getBlobPath(oid),
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def reset(self, position, index):
self._file.truncate(position)
self.position = position
......@@ -1206,3 +1294,4 @@ class TmpStore:
# a copy of the index here. An alternative would be to ensure that
# all callers pass copies. As is, our callers do not make copies.
self.index = index.copy()
......@@ -232,10 +232,10 @@ class DB(object):
# Setup storage
self._storage=storage
storage.registerDB(self, None)
if not hasattr(storage,'tpc_vote'):
if not hasattr(storage, 'tpc_vote'):
storage.tpc_vote = lambda *args: None
try:
storage.load(z64,'')
storage.load(z64, '')
except KeyError:
# Create the database's root in the storage if it doesn't exist
from persistent.mapping import PersistentMapping
......
......@@ -13,13 +13,16 @@
##############################################################################
"""Support for database export and import."""
import os
from cStringIO import StringIO
from cPickle import Pickler, Unpickler
from tempfile import TemporaryFile
import logging
from ZODB.POSException import ExportError
from ZODB.utils import p64, u64
from ZODB.POSException import ExportError, POSKeyError
from ZODB.utils import p64, u64, cp, mktemp
from ZODB.Blobs.interfaces import IBlobStorage
from ZODB.serialize import referencesf
logger = logging.getLogger('ZODB.ExportImport')
......@@ -49,6 +52,21 @@ class ExportImport:
else:
referencesf(p, oids)
f.writelines([oid, p64(len(p)), p])
# Blob support
if not IBlobStorage.providedBy(self._storage):
continue
try:
blobfilename = self._storage.loadBlob(oid,
serial, self._version)
except POSKeyError: # Looks like this is not a blob
continue
f.write(blob_begin_marker)
f.write(p64(os.stat(blobfilename).st_size))
blobdata = open(blobfilename, "rb")
cp(blobdata, f)
blobdata.close()
f.write(export_end_marker)
return f
......@@ -113,17 +131,20 @@ class ExportImport:
version = self._version
while 1:
h = f.read(16)
if h == export_end_marker:
header = f.read(16)
if header == export_end_marker:
break
if len(h) != 16:
if len(header) != 16:
raise ExportError("Truncated export file")
l = u64(h[8:16])
p = f.read(l)
if len(p) != l:
# Extract header information
ooid = header[:8]
length = u64(header[8:16])
data = f.read(length)
if len(data) != length:
raise ExportError("Truncated export file")
ooid = h[:8]
if oids:
oid = oids[ooid]
if isinstance(oid, tuple):
......@@ -132,7 +153,21 @@ class ExportImport:
oids[ooid] = oid = self._storage.new_oid()
return_oid_list.append(oid)
pfile = StringIO(p)
# Blob support
blob_begin = f.read(len(blob_begin_marker))
if blob_begin == blob_begin_marker:
# Copy the blob data to a temporary file
# and remember the name
blob_len = u64(f.read(8))
blob_filename = mktemp()
blob_file = open(blob_filename, "wb")
cp(f, blob_file, blob_len)
blob_file.close()
else:
f.seek(-len(blob_begin_marker),1)
blob_filename = None
pfile = StringIO(data)
unpickler = Unpickler(pfile)
unpickler.persistent_load = persistent_load
......@@ -142,12 +177,17 @@ class ExportImport:
pickler.dump(unpickler.load())
pickler.dump(unpickler.load())
p = newp.getvalue()
data = newp.getvalue()
self._storage.store(oid, None, p, version, transaction)
if blob_filename is not None:
self._storage.storeBlob(oid, None, data, blob_filename,
version, transaction)
else:
self._storage.store(oid, None, data, version, transaction)
export_end_marker = '\377'*16
blob_begin_marker = '\000BLOBSTART'
class Ghost(object):
__slots__ = ("oid",)
......
......@@ -628,7 +628,7 @@ class FileStorage(BaseStorage.BaseStorage,
finally:
self._lock_release()
def store(self, oid, serial, data, version, transaction):
def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
......@@ -651,12 +651,12 @@ class FileStorage(BaseStorage.BaseStorage,
pnv = h.pnv
cached_tid = h.tid
if serial != cached_tid:
if oldserial != cached_tid:
rdata = self.tryToResolveConflict(oid, cached_tid,
serial, data)
oldserial, data)
if rdata is None:
raise POSException.ConflictError(
oid=oid, serials=(cached_tid, serial), data=data)
oid=oid, serials=(cached_tid, oldserial), data=data)
else:
data = rdata
......@@ -686,7 +686,7 @@ class FileStorage(BaseStorage.BaseStorage,
raise FileStorageQuotaError(
"The storage quota has been exceeded.")
if old and serial != cached_tid:
if old and oldserial != cached_tid:
return ConflictResolution.ResolvedSerial
else:
return self._tid
......
......@@ -65,6 +65,11 @@
<sectiontype name="zeoclient" datatype=".ZEOClient"
implements="ZODB.storage">
<multikey name="server" datatype="socket-connection-address" required="yes"/>
<key name="blob-dir" required="no">
<description>
Path name to the blob storage directory.
</description>
</key>
<key name="storage" default="1">
<description>
The name of the storage that the client wants to use. If the
......@@ -189,4 +194,18 @@
</description>
</sectiontype>
<sectiontype name="blobstorage" datatype=".BlobStorage"
implements="ZODB.storage">
<key name="blob-dir" required="yes">
<description>
Path name to the blob storage directory.
</description>
</key>
<section type="ZODB.storage" name="*" attribute="base"/>
</sectiontype>
</component>
......@@ -86,7 +86,7 @@ class BaseConfig:
self.config = config
self.name = config.getSectionName()
def open(self):
def open(self, database_name='unnamed', databases=None):
"""Open and return the storage object."""
raise NotImplementedError
......@@ -134,6 +134,14 @@ class FileStorage(BaseConfig):
read_only=self.config.read_only,
quota=self.config.quota)
class BlobStorage(BaseConfig):
def open(self):
from ZODB.Blobs.BlobStorage import BlobStorage
base = self.config.base.open()
return BlobStorage(self.config.blob_dir, base)
class ZEOClient(BaseConfig):
def open(self):
......@@ -143,6 +151,7 @@ class ZEOClient(BaseConfig):
L = [server.address for server in self.config.server]
return ClientStorage(
L,
blob_dir=self.config.blob_dir,
storage=self.config.storage,
cache_size=self.config.cache_size,
name=self.config.name,
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Support for testing logging code
If you want to test that your code generates proper log output, you
can create and install a handler that collects output:
>>> handler = InstalledHandler('foo.bar')
The handler is installed into loggers for all of the names passed. In
addition, the logger level is set to 1, which means, log
everything. If you want to log less than everything, you can provide a
level keyword argument. The level setting effects only the named
loggers.
Then, any log output is collected in the handler:
>>> logging.getLogger('foo.bar').exception('eek')
>>> logging.getLogger('foo.bar').info('blah blah')
>>> for record in handler.records:
... print record.name, record.levelname
... print ' ', record.getMessage()
foo.bar ERROR
eek
foo.bar INFO
blah blah
A similar effect can be gotten by just printing the handler:
>>> print handler
foo.bar ERROR
eek
foo.bar INFO
blah blah
After checking the log output, you need to uninstall the handler:
>>> handler.uninstall()
At which point, the handler won't get any more log output.
Let's clear the handler:
>>> handler.clear()
>>> handler.records
[]
And then log something:
>>> logging.getLogger('foo.bar').info('blah')
and, sure enough, we still have no output:
>>> handler.records
[]
$Id: loggingsupport.py 28349 2004-11-06 00:10:32Z tim_one $
"""
import logging
class Handler(logging.Handler):
def __init__(self, *names, **kw):
logging.Handler.__init__(self)
self.names = names
self.records = []
self.setLoggerLevel(**kw)
def setLoggerLevel(self, level=1):
self.level = level
self.oldlevels = {}
def emit(self, record):
self.records.append(record)
def clear(self):
del self.records[:]
def install(self):
for name in self.names:
logger = logging.getLogger(name)
self.oldlevels[name] = logger.level
logger.setLevel(self.level)
logger.addHandler(self)
def uninstall(self):
for name in self.names:
logger = logging.getLogger(name)
logger.setLevel(self.oldlevels[name])
logger.removeHandler(self)
def __str__(self):
return '\n'.join(
[("%s %s\n %s" %
(record.name, record.levelname,
'\n'.join([line
for line in record.getMessage().split('\n')
if line.strip()])
)
)
for record in self.records]
)
class InstalledHandler(Handler):
def __init__(self, *names):
Handler.__init__(self, *names)
self.install()
......@@ -102,6 +102,9 @@ class ZEOConfigTest(ConfigTestBase):
# an elaborate comment explaining this instead. Go ahead,
# grep for 9.
from ZEO.ClientStorage import ClientDisconnected
import ZConfig
from ZODB.config import getDbSchema
from StringIO import StringIO
cfg = """
<zodb>
<zeoclient>
......@@ -110,6 +113,23 @@ class ZEOConfigTest(ConfigTestBase):
</zeoclient>
</zodb>
"""
config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
self.assertEqual(config.database.config.storage.config.blob_dir,
None)
self.assertRaises(ClientDisconnected, self._test, cfg)
cfg = """
<zodb>
<zeoclient>
blob-dir /tmp
server localhost:56897
wait false
</zeoclient>
</zodb>
"""
config, handle = ZConfig.loadConfigFile(getDbSchema(), StringIO(cfg))
self.assertEqual(config.database.config.storage.config.blob_dir,
'/tmp')
self.assertRaises(ClientDisconnected, self._test, cfg)
......
......@@ -193,3 +193,4 @@ However, using a savepoint invalidates any savepoints that come after it:
InvalidSavepointRollbackError
>>> transaction.abort()
......@@ -16,11 +16,13 @@ import sys
import time
import struct
from struct import pack, unpack
from binascii import hexlify
from binascii import hexlify, unhexlify
import cPickle as pickle
from cStringIO import StringIO
import weakref
import warnings
from tempfile import mkstemp
import os
from persistent.TimeStamp import TimeStamp
......@@ -82,20 +84,33 @@ def u64(v):
U64 = u64
def cp(f1, f2, l):
def cp(f1, f2, length=None):
"""Copy all data from one file to another.
It copies the data from the current position of the input file (f1)
appending it to the current position of the output file (f2).
It copies at most 'length' bytes. If 'length' isn't given, it copies
until the end of the input file.
"""
read = f1.read
write = f2.write
n = 8192
while l > 0:
if n > l:
n = l
d = read(n)
if not d:
if length is None:
old_pos = f1.tell()
f1.seek(0,2)
length = f1.tell()
f1.seek(old_pos)
while length > 0:
if n > length:
n = length
data = read(n)
if not data:
break
write(d)
l = l - len(d)
write(data)
length -= len(data)
def newTimeStamp(old=None,
TimeStamp=TimeStamp,
......@@ -120,6 +135,13 @@ def oid_repr(oid):
else:
return repr(oid)
def repr_to_oid(repr):
if repr.startswith("0x"):
repr = repr[2:]
as_bin = unhexlify(repr)
as_bin = "\x00"*(8-len(as_bin)) + as_bin
return as_bin
serial_repr = oid_repr
tid_repr = serial_repr
......@@ -265,3 +287,12 @@ class WeakSet(object):
# We're cheating by breaking into the internals of Python's
# WeakValueDictionary here (accessing its .data attribute).
return self.data.data.values()
def mktemp(dir=None):
"""Create a temp file, known by name, in a semi-secure manner."""
handle, filename = mkstemp(dir=dir)
os.close(handle)
return filename
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment