Commit 433068f8 authored by Christian Theune's avatar Christian Theune

- merged ctheune-blobszerocopy branch

 - added notice to history txt that zodb 3.8 now contains blobs :)
parent 7c941664
......@@ -3,6 +3,8 @@ What's new in ZODB3 3.8.0?
==========================
Release date: ???
- Added support for Blobs.
BTrees
------
......
===========================================
How to use NFS to make Blobs more efficient
===========================================
:Author: Christian Theune <ct@gocept.com>
Overview
========
When handling blobs, the biggest goal is to avoid writing operations that
require the blob data to be transferred using up IO resources.
When bringing a blob into the system, at least one O(N) operation has to
happen, e.g. when the blob is uploaded via a network server. The blob should
be extracted as a file on the final storage volume as early as possible,
avoiding further copies.
In a ZEO setup, all data is stored on a networked server and passed to it
using zrpc. This is a major problem for handling blobs, because it will lock
all transactions from committing when storing a single large blob. As a
default, this mechanism works but is not recommended for high-volume
installations.
Shared filesystem
=================
The solution for the transfer problem is to setup various storage parameters
so that blobs are always handled on a single volume that is shared via network
between ZEO servers and clients.
Step 1: Setup a writable shared filesystem for ZEO server and client
--------------------------------------------------------------------
On the ZEO server, create two directories on the volume that will be used by
this setup (assume the volume is accessible via $SERVER/):
- $SERVER/blobs
- $SERVER/tmp
Then export the $SERVER directory using a shared network filesystem like NFS.
Make sure it's writable by the ZEO clients.
Assume the exported directory is available on the client as $CLIENT.
Step 2: Application temporary directories
-----------------------------------------
Applications (i.e. Zope) will put uploaded data in a temporary directory
first. Adjust your TMPDIR, TMP or TEMP environment variable to point to the
shared filesystem:
$ export TMPDIR=$CLIENT/tmp
Step 3: ZEO client caches
-------------------------
Edit the file `zope.conf` on the ZEO client and adjust the configuration of
the `zeoclient` storage with two new variables::
blob-dir = $CLIENT/blobs
blob-cache-writable = yes
Step 4: ZEO server
------------------
Edit the file `zeo.conf` on the ZEO server to configure the blob directory.
Assuming the published storage of the ZEO server is a file storage, then the
configuration should look like this::
<blobstorage 1>
<filestorage>
path $INSTANCE/var/Data.fs
<filestorage>
bob-dir $SERVER/blobs
</blobstorage>
(Remember to manually replace $SERVER and $CLIENT with the exported directory
as accessible bei either the ZEO server or the ZEO client.)
Conclusion
----------
At this point, after restarting your ZEO server and clients, the blob
directory will be shared and a minimum amount of IO will occur when working
with blobs.
......@@ -112,7 +112,7 @@ class ClientStorage(object):
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
username='', password='', realm=None,
blob_dir=None):
blob_dir=None, blob_cache_writable=False):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
......@@ -188,6 +188,10 @@ class ClientStorage(object):
blob_dir -- directory path for blob data. 'blob data' is data that
is retrieved via the loadBlob API.
blob_cache_writable -- Flag whether the blob_dir is a writable shared
filesystem that should be used instead of transferring blob data over
zrpc.
Note that the authentication protocol is defined by the server
and is detected by the ClientStorage upon connecting (see
testConnection() and doAuth() for details).
......@@ -315,6 +319,8 @@ class ClientStorage(object):
self._lock = threading.Lock()
# XXX need to check for POSIX-ness here
self.blob_dir = blob_dir
self.blob_cache_writable = blob_cache_writable
if blob_dir is not None:
self.fshelper = FilesystemHelper(blob_dir)
self.fshelper.create()
......@@ -892,6 +898,26 @@ class ClientStorage(object):
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
"""Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
if self.blob_cache_writable:
self._storeBlob_shared(oid, serial, data, blobfilename, version, txn)
else:
self._storeBlob_copy(oid, serial, data, blobfilename, version, txn)
return serials
def _storeBlob_shared(self, oid, serial, data, filename, version, txn):
# First, move the blob into the blob directory
dir = self.fshelper.getPathForOID(oid)
if not os.path.exists(dir):
os.mkdir(dir)
fd, target = self.fshelper.blob_mkstemp(oid, serial)
os.close(fd)
os.rename(filename, target)
# Now tell the server where we put it
self._server.storeBlobShared(oid, serial, data,
os.path.basename(target), version, id(txn))
def _storeBlob_copy(self, oid, serial, data, blobfilename, version, txn):
"""Version of storeBlob() that copies the data over the ZEO protocol."""
blobfile = open(blobfilename, "rb")
while True:
chunk = blobfile.read(1<<16)
......@@ -904,7 +930,6 @@ class ClientStorage(object):
break
blobfile.close()
os.unlink(blobfilename)
return serials
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
......@@ -999,7 +1024,7 @@ class ClientStorage(object):
def getBlobLock(self):
# indirection to support unit testing
return Lock()
return threading.Lock()
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
......
......@@ -226,6 +226,10 @@ class StorageServer:
def storeBlob(self, oid, serial, chunk, version, id):
self.rpc.callAsync('storeBlob', oid, serial, chunk, version, id)
def storeBlobShared(self, oid, serial, data, filename, version, id):
self.rpc.callAsync('storeBlobShared', oid, serial, data, filename,
version, id)
##
# Start two-phase commit for a transaction
# @param id id used by client to identify current transaction. The
......
......@@ -482,12 +482,18 @@ class ZEOStorage:
if key not in self.blob_transfer:
tempname = mktemp()
tempfile = open(tempname, "wb")
self.blob_transfer[key] = (tempname, tempfile) # XXX Force close and remove them when Storage closes
# XXX Force close and remove them when Storage closes
self.blob_transfer[key] = (tempname, tempfile)
else:
tempname, tempfile = self.blob_transfer[key]
tempfile.write(chunk)
def storeBlobShared(self, oid, serial, data, filename, version, id):
# Reconstruct the full path from the filename in the OID directory
filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
filename)
self.blob_log.append((oid, serial, data, filename, version))
def loadBlob(self, oid, serial, version, offset):
key = (oid, serial)
if not key in self.blob_loads:
......
......@@ -133,6 +133,9 @@ class GenericTests(
"""Combine tests from various origins in one class."""
blob_cache_writable = False
blob_cache_dir = None
def setUp(self):
logger.info("setUp() %s", self.id())
port = get_port()
......@@ -142,10 +145,12 @@ class GenericTests(
self._pids = [pid]
self._servers = [adminaddr]
self._conf_path = path
if not self.blob_cache_dir:
self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
self._storage = ClientStorage(zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1,
wait_timeout=60, blob_dir=self.blob_cache_dir)
wait_timeout=60, blob_dir=self.blob_cache_dir,
blob_cache_writable=self.blob_cache_writable)
self._storage.registerDB(DummyDB(), None)
def tearDown(self):
......@@ -397,15 +402,13 @@ test_classes = [OneTimeTests,
ConnectionInvalidationOnReconnect,
]
class BlobAdaptedFileStorageTests(GenericTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
self.filestorage = tempfile.mktemp()
super(BlobAdaptedFileStorageTests, self).setUp()
class CommonBlobTests:
def tearDown(self):
super(BlobAdaptedFileStorageTests, self).tearDown()
if os.path.exists(self.blobdir):
# Might be gone already if the super() method deleted
# the shared directory. Don't worry.
shutil.rmtree(self.blobdir)
def getConfig(self):
......@@ -470,6 +473,46 @@ class BlobAdaptedFileStorageTests(GenericTests):
oid = self._storage.new_oid()
data = zodb_pickle(blob)
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
r2 = self._storage.tpc_vote(t)
serial = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
filename = self._storage.loadBlob(oid, serial, version)
self.assertEquals(somedata, open(filename, 'rb').read())
class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
self.filestorage = tempfile.mktemp()
super(BlobAdaptedFileStorageTests, self).setUp()
def checkLoadBlobLocks(self):
from ZODB.Blobs.Blob import Blob
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials
import transaction
version = ''
somedata = 'a' * 10
blob = Blob()
bd_fh = blob.open('w')
bd_fh.write(somedata)
bd_fh.close()
tfname = bd_fh.name
oid = self._storage.new_oid()
data = zodb_pickle(blob)
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
......@@ -527,8 +570,18 @@ class BlobAdaptedFileStorageTests(GenericTests):
self.assertEqual(thestatusdict.added, [(oid, serial)])
self.assertEqual(thestatusdict.removed, [(oid, serial)])
class BlobWritableCacheTests(GenericTests, CommonBlobTests):
def setUp(self):
self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
self.filestorage = tempfile.mktemp()
self.blob_cache_writable = True
super(BlobWritableCacheTests, self).setUp()
test_classes = [FileStorageTests, MappingStorageTests,
BlobAdaptedFileStorageTests]
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
def test_suite():
suite = unittest.TestSuite()
......
......@@ -17,6 +17,7 @@
__docformat__ = "reStructuredText"
import os
import sys
import time
import tempfile
import logging
......@@ -30,6 +31,8 @@ import transaction
import transaction.interfaces
from persistent import Persistent
if sys.platform == 'win32':
import win32file
BLOB_SUFFIX = ".blob"
......@@ -38,6 +41,12 @@ class Blob(Persistent):
zope.interface.implements(IBlob)
# Binding this to an attribute allows overriding it in the unit tests
if sys.platform == 'win32':
_os_link = lambda src, dst: win32file.CreateHardLink(src, dst, None)
else:
_os_link = os.link
_p_blob_readers = 0
_p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
......@@ -56,39 +65,34 @@ class Blob(Persistent):
def open(self, mode="r"):
"""Returns a file(-like) object representing blob data."""
tempdir = os.environ.get('ZODB_BLOB_TEMPDIR', tempfile.gettempdir())
result = None
if (mode.startswith("r") or mode=="U"):
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
raise BlobError("Blob does not exist.")
if self._p_blob_writers != 0:
raise BlobError, "Already opened for writing."
raise BlobError("Already opened for writing.")
self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self)
elif mode.startswith("w"):
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None:
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
raise BlobError("Already opened for reading.")
self._p_blob_writers += 1
if self._p_blob_uncommitted is None:
self._create_uncommitted_file()
result = BlobFile(self._p_blob_uncommitted, mode, self)
elif mode.startswith("a"):
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
raise BlobError("Already opened for reading.")
if self._p_blob_uncommitted is None:
# Create a new working copy
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
uncommitted = BlobFile(self._create_uncommitted_file(), mode, self)
# NOTE: _p_blob data appears by virtue of Connection._setstate
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
......@@ -100,9 +104,90 @@ class Blob(Persistent):
result = uncommitted
else:
raise IOError, 'invalid mode: %s ' % mode
raise IOError('invalid mode: %s ' % mode)
if result is not None:
self._setup_transaction_manager(result)
return result
def openDetached(self, class_=file):
"""Returns a file(-like) object in read mode that can be used
outside of transaction boundaries.
"""
if self._current_filename() is None:
raise BlobError("Blob does not exist.")
if self._p_blob_writers != 0:
raise BlobError("Already opened for writing.")
# XXX this should increase the reader number and have a test !?!
return class_(self._current_filename(), "rb")
def consumeFile(self, filename):
"""Will replace the current data of the blob with the file given under
filename.
"""
if self._p_blob_writers != 0:
raise BlobError("Already opened for writing.")
if self._p_blob_readers != 0:
raise BlobError("Already opened for reading.")
previous_uncommitted = bool(self._p_blob_uncommitted)
if previous_uncommitted:
# If we have uncommitted data, we move it aside for now
# in case the consumption doesn't work.
target = self._p_blob_uncommitted
target_aside = target+".aside"
os.rename(target, target_aside)
else:
target = self._create_uncommitted_file()
# We need to unlink the freshly created target again
# to allow link() to do its job
os.unlink(target)
try:
self._os_link(filename, target)
except:
# Recover from the failed consumption: First remove the file, it
# might exist and mark the pointer to the uncommitted file.
self._p_blob_uncommitted = None
if os.path.exists(target):
os.unlink(target)
# If there was a file moved aside, bring it back including the pointer to
# the uncommitted file.
if previous_uncommitted:
os.rename(target_aside, target)
self._p_blob_uncommitted = target
# Re-raise the exception to make the application aware of it.
raise
else:
if previous_uncommitted:
# The relinking worked so we can remove the data that we had
# set aside.
os.unlink(target_aside)
# We changed the blob state and have to make sure we join the
# transaction.
self._change()
# utility methods
def _current_filename(self):
# NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
# Connection._setstate
return self._p_blob_uncommitted or self._p_blob_data
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, "Uncommitted file already exists."
tempdir = os.environ.get('ZODB_BLOB_TEMPDIR', tempfile.gettempdir())
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted
def _change(self):
self._p_changed = 1
def _setup_transaction_manager(self, result):
# We join the transaction with our own data manager in order to be
# notified of commit/vote/abort events. We do this because at
# transaction boundaries, we need to fix up _p_ reference counts
......@@ -126,34 +211,12 @@ class Blob(Persistent):
# Create our datamanager and join he current transaction.
dm = BlobDataManager(self, result, tm)
tm.get().join(dm)
else:
elif result:
# Each blob data manager should manage only the one blob
# assigned to it. Assert that this is the case and it is the
# correct blob
assert self._p_blob_manager.blob is self
self._p_blob_manager.register_fh(result)
return result
def openDetached(self):
"""Returns a file(-like) object in read mode that can be used
outside of transaction boundaries.
"""
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
if self._p_blob_writers != 0:
raise BlobError, "Already opened for writing."
return file(self._current_filename(), "rb")
# utility methods
def _current_filename(self):
# NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
# Connection._setstate
return self._p_blob_uncommitted or self._p_blob_data
def _change(self):
self._p_changed = 1
# utility methods which should not cause the object's state to be
# loaded if they are called while the object is a ghost. Thus,
......@@ -171,7 +234,7 @@ class Blob(Persistent):
elif mode.startswith('w') or mode.startswith('a'):
self._p_blob_writers = max(0, self._p_blob_writers - 1)
else:
raise AssertionError, 'Unknown mode %s' % mode
raise AssertionError('Unknown mode %s' % mode)
def _p_blob_refcounts(self):
# used by unit tests
......
##############################################################################
#
# Copyright (c) 2005-2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Blob-related interfaces
"""
from zope.interface import Interface
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
......@@ -10,7 +27,7 @@ class IBlob(Interface):
mode: Mode to open the file with. Possible values: r,w,r+,a
"""
def openDetached():
def openDetached(class_=file):
"""Returns a file(-like) object in read mode that can be used
outside of transaction boundaries.
......@@ -19,11 +36,23 @@ class IBlob(Interface):
The handle is not attached to the blob and can be used outside of a
transaction.
Optionally the class that should be used to open the file can be
specified. This can be used to e.g. use Zope's FileStreamIterator.
"""
def consumeFile(filename):
"""Will replace the current data of the blob with the file given under
filename.
This method uses link-like semantics internally and has the requirement
that the file that is to be consumed lives on the same volume (or
mount/share) as the blob directory.
The blob must not be opened for reading or writing when consuming a
file.
"""
# XXX need a method to initialize the blob from the storage
# this means a) setting the _p_blob_data filename and b) putting
# the current data in that file
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
......@@ -39,4 +68,3 @@ class IBlobStorage(Interface):
Raises POSKeyError if the blobfile cannot be found.
"""
Consuming existing files
========================
The ZODB Blob implementation allows to import existing files as Blobs within
an O(1) operation we call `consume`::
Let's create a file::
>>> import tempfile
>>> to_import = tempfile.NamedTemporaryFile()
>>> to_import.write("I'm a Blob and I feel fine.")
>>> to_import.flush()
Now, let's consume this file in a blob by specifying it's name::
>>> from ZODB.Blobs.Blob import Blob
>>> blob = Blob()
>>> blob.consumeFile(to_import.name)
We now can call open on the blob and read and write the data::
>>> blob_read = blob.open('rb')
>>> blob_read.read()
"I'm a Blob and I feel fine."
>>> blob_read.close()
>>> blob_write = blob.open('w')
>>> blob_write.write('I was changed.')
>>> blob_write.close()
Please note that the interface for the `consume` method specifies a hard-link
as a part of the contract so your existing file and the blob file will be the
same. If one gets changed the other will reflect those changes as well. This
is especially a known side-effect when consuming a file and then opening the
blob for writing before committing in between::
>>> to_import.seek(0)
>>> to_import.read()
'I was changed.'
(Applications are expected that files for consumption are typically copies of
existing data and that the imported link to the file will be removed after a
successfull import. This can be achieved (as in this test) by using a
NamedTempFile.)
We can not consume a file when there is a reader or writer around for a blob
already::
>>> to_import2 = tempfile.NamedTemporaryFile()
>>> to_import2.write('I am another blob.')
>>> to_import2.flush()
>>> blob_read = blob.open('r')
>>> blob.consumeFile(to_import2.name)
Traceback (most recent call last):
BlobError: Already opened for reading.
>>> blob_read.close()
>>> blob_write = blob.open('w')
>>> blob.consumeFile(to_import2.name)
Traceback (most recent call last):
BlobError: Already opened for writing.
>>> blob_write.close()
Now, after closing all readers and writers we can consume files again::
>>> blob.consumeFile(to_import2.name)
>>> blob_read = blob.open('r')
>>> blob_read.read()
'I am another blob.'
Edge cases
==========
There are some edge cases what happens when the link() operation fails. We simulate this in different states:
Case 1: We don't have uncommitted data, but the link operation fails. The
exception will be re-raised and the target file will not exist::
>>> input = tempfile.NamedTemporaryFile()
>>> input.write('Some data')
>>> input.flush()
>>> def failing_link(self, filename):
... raise Exception("I can't link.")
>>> blob = Blob()
>>> blob.open('r')
Traceback (most recent call last):
BlobError: Blob does not exist.
>>> blob._os_link = failing_link
>>> blob.consumeFile(input.name)
Traceback (most recent call last):
Exception: I can't link.
The blob did not exist before, so it shouldn't exist now::
>>> blob.open('r')
Traceback (most recent call last):
BlobError: Blob does not exist.
Case 2: We thave uncommitted data, but the link operation fails. The
exception will be re-raised and the target file will exist with the previous
uncomitted data::
>>> input = tempfile.NamedTemporaryFile()
>>> input.write('Unimported data')
>>> input.flush()
>>> blob = Blob()
>>> blob_writing = blob.open('w')
>>> blob_writing.write('Uncommitted data')
>>> blob_writing.close()
>>> blob._os_link = failing_link
>>> blob.consumeFile(input.name)
Traceback (most recent call last):
Exception: I can't link.
The blob did existed before and had uncommitted data, this shouldn't have
changed::
>>> blob.open('r').read()
'Uncommitted data'
......@@ -16,4 +16,4 @@ from zope.testing.doctestunit import DocFileSuite
def test_suite():
return DocFileSuite("basic.txt", "connection.txt", "transaction.txt",
"packing.txt", "importexport.txt")
"packing.txt", "importexport.txt", "consume.txt")
......@@ -94,6 +94,47 @@ class BlobUndoTests(unittest.TestCase):
self.assertEqual(blob.open('r').read(), 'this is state 1')
transaction.abort()
def testUndoAfterConsumption(self):
base_storage = FileStorage(self.storagefile)
blob_storage = BlobStorage(self.blob_dir, base_storage)
database = DB(blob_storage)
connection = database.open()
root = connection.root()
transaction.begin()
to_consume = tempfile.NamedTemporaryFile()
to_consume.write('this is state 1')
to_consume.flush()
blob = Blob()
blob.consumeFile(to_consume.name)
root['blob'] = blob
transaction.commit()
transaction.begin()
blob = root['blob']
to_consume = tempfile.NamedTemporaryFile()
to_consume.write('this is state 2')
to_consume.flush()
blob.consumeFile(to_consume.name)
transaction.commit()
transaction.begin()
blob = root['blob']
self.assertEqual(blob.open('r').read(), 'this is state 2')
transaction.abort()
serial = base64.encodestring(blob_storage._tid)
transaction.begin()
blob_storage.undo(serial, blob_storage._transaction)
transaction.commit()
transaction.begin()
blob = root['blob']
self.assertEqual(blob.open('r').read(), 'this is state 1')
transaction.abort()
def testRedo(self):
base_storage = FileStorage(self.storagefile)
blob_storage = BlobStorage(self.blob_dir, base_storage)
......
......@@ -67,9 +67,18 @@
<multikey name="server" datatype="socket-connection-address" required="yes"/>
<key name="blob-dir" required="no">
<description>
Path name to the blob storage directory.
Path name to the blob cache directory.
</description>
</key>
<key name="blob-cache-writable" required="no" default="no"
datatype="boolean">
<description>
Tells whether the cache is a shared writable directory
and that the ZEO protocol should not transfer the file
but only the filename when committing.
</description>
</key>
<key name="storage" default="1">
<description>
The name of the storage that the client wants to use. If the
......
......@@ -152,6 +152,7 @@ class ZEOClient(BaseConfig):
return ClientStorage(
L,
blob_dir=self.config.blob_dir,
blob_cache_writable=self.config.blob_cache_writable,
storage=self.config.storage,
cache_size=self.config.cache_size,
name=self.config.name,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment