Commit e8a64ca1 authored by Christian Theune's avatar Christian Theune

- first stuff for blob support

parent fd91affd
...@@ -135,7 +135,8 @@ exts += [cPersistence, cPickleCache, TimeStamp, winlock, cZopeInterface] ...@@ -135,7 +135,8 @@ exts += [cPersistence, cPickleCache, TimeStamp, winlock, cZopeInterface]
packages = ["BTrees", "BTrees.tests", packages = ["BTrees", "BTrees.tests",
"ZEO", "ZEO.auth", "ZEO.zrpc", "ZEO.tests", "ZEO", "ZEO.auth", "ZEO.zrpc", "ZEO.tests",
"ZODB", "ZODB.FileStorage", "ZODB.tests", "ZODB", "ZODB.FileStorage", "ZODB.Blobs",
"ZODB.tests",
"Persistence", "Persistence.tests", "Persistence", "Persistence.tests",
"persistent", "persistent.tests", "persistent", "persistent.tests",
"transaction", "transaction.tests", "transaction", "transaction.tests",
......
import os
from zope.interface import implements
from ZODB.Blobs.interfaces import IBlob
from ZODB.Blobs.exceptions import BlobError
from ZODB import utils
from persistent import Persistent
class TempFileHandler(object):
"""Handles holding a tempfile around.
The tempfile is unlinked when the tempfilehandler is GCed.
"""
def __init__(self, directory, mode)
self.handle, self.filename = tempfile.mkstemp(dir=directory,
text=mode)
def __del__(self):
self.handle
os.unlink(self.filename)
class Blob(Persistent):
implements(IBlob)
def __init__(self):
self._p_blob_readers = 0
self._p_blob_writers = 0
self._p_blob_uncommitted = None
self._p_blob_data = None
def open(self, mode):
"""Returns a file(-like) object for handling the blob data."""
if mode == "r":
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
if self._p_blob_writers != 0:
raise BlobError, "Already opened for writing."
self._p_blob_readers += 1
return BlobTempFile(self._current_filename(), "rb", self)
if mode == "w":
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None:
self._p_blob_uncommitted = self._get_uncommitted_filename()
self._p_blob_writers += 1
return BlobTempFile(self._p_blob_uncommitted, "wb", self)
if mode =="a":
if self._current_filename() is None:
raise BlobError, "Blob does not exist."
if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading."
if not self._p_blob_uncommitted:
# Create a new working copy
self._p_blob_uncommitted = self._get_uncommitted_filename()
uncommitted = BlobTempFile(self._p_blob_uncommitted, "wb", self)
utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0)
else:
# Re-use existing working copy
uncommitted = BlobTempFile(self._p_blob_uncommitted, "ab", self)
self._p_blob_writers +=1
return uncommitted
# utility methods
def _current_filename(self):
return self._p_blob_uncommitted or self._p_blob_data
def _get_uncommitted_filename(self):
return os.tempnam()
class BlobFileBase:
# XXX those files should be created in the same partition as
# the storage later puts them to avoid copying them ...
def __init__(self, name, mode, blob):
file.__init__(self, name, mode)
self.blob = blob
def write(self, data):
file.write(self, data)
self.blob._p_changed = 1
def writelines(self, lines):
file.writelines(self, lines)
self.blob._p_changed = 1
def truncate(self, size):
file.truncate(self, size)
self.blob._p_changed = 1
def close(self):
if (self.mode.startswith("w") or
self.mode.startswith("a")):
self.blob._p_blob_writers -= 1
else:
self.blob._p_blob_readers -= 1
file.close(self)
class BlobFile(BlobFileBase, file):
pass
class BlobTempFile(BlobFileBase, NamedTempFile)
pass
def copy_file(old, new):
for chunk in old.read(4096):
new.write(chunk)
new.seek(0)
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from zope.interface import implements
from zope.proxy import ProxyBase
from ZODB.interfaces import \
IStorageAdapter, IUndoableStorage, IVersioningStorage, IBlobStorage
class BlobStorage(ProxyBase):
"""A storage to support blobs."""
implements(IBlobStorage)
__slots__ = ('base_directory',)
def __init__(self, base_directory, storage):
ProxyBase.__init__(self, storage)
self.base_directory = base_directory
def storeBlob(oid, serial, data, blob, version, transaction):
"""Stores data that has a BLOB attached."""
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
#
finally:
self._lock_release()
return self._tid
def loadBlob(oid, serial, version, blob):
"""Loads the BLOB data for 'oid' into the given blob object.
"""
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
ZODB Blob support
=================
You create a blob like this:
>>> from ZODB.Blobs.Blob import Blob
>>> myblob = Blob()
Opening a new Blob for reading fails:
>>> myblob.open("r")
Traceback (most recent call last):
...
BlobError: Blob does not exist.
But we can write data to a new Blob by opening it for writing:
>>> f = myblob.open("w")
>>> f.write("Hi, Blob!")
If we try to open a Blob again while it is open for writing, we get an error:
>>> myblob.open("r")
Traceback (most recent call last):
...
BlobError: Already opened for writing.
We can close the file:
>>> f.close()
Now we can open it for reading:
>>> f2 = myblob.open("r")
And we get the data back:
>>> f2.read()
'Hi, Blob!'
If we want to, we can open it again:
>>> f3 = myblob.open("r")
>>> f3.read()
'Hi, Blob!'
But we can't open it for writing, while it is opened for reading:
>>> myblob.open("a")
Traceback (most recent call last):
...
BlobError: Already opened for reading.
Before we can write, we have to close the readers:
>>> f2.close()
>>> f3.close()
Now we can open it for writing again and e.g. append data:
>>> f4 = myblob.open("a")
>>> f4.write("\nBlob is fine.")
>>> f4.close()
Now we can read it:
>>> myblob.open("r").read()
'Hi, Blob!\nBlob is fine.'
- Blob instances should clean up temporary files after committing
Goal: Handle BLOBs (within the Zope context) better.
Measure:
- Don't block ZServer on uploads and downloads
- Don't hold BLOBS in memory or cache if not necessary (LRU caches tend
to break if we split BLOBs in lot of small objects. Size-based caches
tend to break on single large objects)
- Transparent for other systems, support normal ZODB operations.
Comments:
- Cache: BLOBs could be cached in a seperate "BLOB" space, e.g. in single files
- Be storage independent?
- Memory efficiency: Storge.load() currently holds all data of an object in a string.
Steps:
- simple aspects:
- blobs should be known by zodb
- storages, esp. clientstorage must be able to recognize blobs
- to avoid putting blob data into the client cache.
- blob data mustn't end up in the object cache
- blob object and blob data need to be handled separately
- blob data on client is stored in temporary files
- complicated aspects
- temporary files holding blob data could server as a separated cache for blob data
- storage / zodb api change
-
Restrictions:
- a particular BLOB instance can't be open for read _and_ write at the same time
- Allowed: N readers, no writers; 1 writer, no readers
- Reason:
- Data has been committed? -> File(name) for commited data available
- .open("r") on fresh loaded blob returns committed data
- first .open("w") -> new empty file for uncommitted data
- .open("a") or .open("r+"), we copy existing data into file for uncommitted data
- if uncommitted data exists, subsequent .open("*") will use the uncommitted data
- if opened for writing, the object is marked as changed (optimiziation possible)
- connections want to recognize blobs on transaction boundaries
-
from zope.interface import Interface
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
def open(mode):
"""Returns a file(-like) object for handling the blob data.
mode: Mode to open the file with. Possible values: r,w,r+,a
"""
# XXX need a method to initialize the blob from the storage
# this means a) setting the _p_blob_data filename and b) putting
# the current data in that file
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Connection support for Blobs tests
==================================
Connections handle Blobs specially. To demonstrate that, we first
need a Blob with some data:
>>> from ZODB.Blobs.interfaces import IBlob
>>> from ZODB.Blobs.Blob import Blob
>>> blob = Blob()
>>> data = blob.open("w")
>>> data.write("I'm a happy Blob.")
We also need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(storage)
Putting a Blob into a Connection works like every other object:
>>> connection = database.open()
>>> root = connection.root()
>>> root['myblob'] = blob
>>> import transaction
>>> transaction.commit()
>>> connection.close()
Getting stuff out of there works similar:
>>> connection = database.open()
>>> root = connection.root()
>>> blob2 = root['myblob']
>>> IBlob.isImplementedBy(blob2)
True
>>> blob2.open("r").read()
"I'm a happy Blob."
You can't put blobs into a database that has uses a Non-Blob-Storage, though:
>>> no_blob_storage = MappingStorage()
>>> database2 = DB(no_blob_storage)
>>> connection = database.open()
>>> root = connection.root()
>>> root['myblob'] = blob
>>> transaction.commit()
Traceback (most recent call last):
...
POSException.Unsupported: Storing Blobs is not supported.
While we are testing this, we don't need the storage directory and databases anymore:
>>> import os
>>> os.unlink(blob_dir)
>>> database.close()
>>> database2.close()
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.testing.doctestunit import DocFileSuite
def test_suite():
return DocFileSuite("../README.txt")
...@@ -635,6 +635,7 @@ class Connection(ExportImport, object): ...@@ -635,6 +635,7 @@ class Connection(ExportImport, object):
raise ConflictError(object=obj) raise ConflictError(object=obj)
self._modified.append(oid) self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction) s = self._storage.store(oid, serial, p, self._version, transaction)
self._store_count += 1 self._store_count += 1
# Put the object in the cache before handling the # Put the object in the cache before handling the
......
...@@ -39,3 +39,14 @@ class IConnection(zope.interface.Interface): ...@@ -39,3 +39,14 @@ class IConnection(zope.interface.Interface):
must implement the IPersistent interface and must not must implement the IPersistent interface and must not
already be associated with a Connection. already be associated with a Connection.
""" """
class IBlobStorage(zope.interface.Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, serial, data, blob, version, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial, version, blob):
"""Loads the BLOB data for 'oid' into the given blob object.
"""
...@@ -86,20 +86,27 @@ def u64(v): ...@@ -86,20 +86,27 @@ def u64(v):
U64 = u64 U64 = u64
def cp(f1, f2, l): def cp(f1, f2, length):
"""Copy all data from one file to another.
It copies the data from the current position of the input file (f1)
appending it to the current position of the output file (f2).
It copies at most 'length' bytes. If 'length' isn't given, it copies
until the end of the input file.
"""
read = f1.read read = f1.read
write = f2.write write = f2.write
n = 8192 n = 8192
while l > 0: while length > 0:
if n > l: if n > length:
n = l n = length
d = read(n) data = read(n)
if not d: if not data:
break break
write(d) write(data)
l = l - len(d) length -= len(data)
def newTimeStamp(old=None, def newTimeStamp(old=None,
TimeStamp=TimeStamp, TimeStamp=TimeStamp,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment