Commit c7a04601 authored by Christian Theune's avatar Christian Theune

- Removed usage of special environment variable to tell blobs where to create

   new uncommitted data.
 - Made blobs use the temporaryDirectory() method of storages to store new
   uncommitted data near the committed data.
parent 1e6f658d
...@@ -324,7 +324,7 @@ class ClientStorage(object): ...@@ -324,7 +324,7 @@ class ClientStorage(object):
if blob_dir is not None: if blob_dir is not None:
# Avoid doing this import unless we need it, as it # Avoid doing this import unless we need it, as it
# currently requires pywin32 on Windows. # currently requires pywin32 on Windows.
import ZODB.blob import ZODB.blob
self.fshelper = ZODB.blob.FilesystemHelper(blob_dir) self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
self.fshelper.create() self.fshelper.create()
self.fshelper.checkSecure() self.fshelper.checkSecure()
...@@ -955,11 +955,11 @@ class ClientStorage(object): ...@@ -955,11 +955,11 @@ class ClientStorage(object):
f = open(blob_filename, 'ab') f = open(blob_filename, 'ab')
f.write(chunk) f.write(chunk)
f.close() f.close()
def recieveBlobStop(self, oid, serial): def recieveBlobStop(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial) blob_filename = self.fshelper.getBlobFilename(oid, serial)
os.rename(blob_filename+'.dl', blob_filename) os.rename(blob_filename+'.dl', blob_filename)
def loadBlob(self, oid, serial): def loadBlob(self, oid, serial):
# Load a blob. If it isn't present and we have a shared blob # Load a blob. If it isn't present and we have a shared blob
...@@ -1017,7 +1017,7 @@ class ClientStorage(object): ...@@ -1017,7 +1017,7 @@ class ClientStorage(object):
except OSError: except OSError:
pass pass
break break
if self._have_blob(blob_filename, oid, serial): if self._have_blob(blob_filename, oid, serial):
return blob_filename return blob_filename
...@@ -1034,7 +1034,7 @@ class ClientStorage(object): ...@@ -1034,7 +1034,7 @@ class ClientStorage(object):
# Ask the server to send it to us. When this function # Ask the server to send it to us. When this function
# returns, it will have been sent. (The recieving will # returns, it will have been sent. (The recieving will
# have been handled by the asyncore thread.) # have been handled by the asyncore thread.)
self._server.sendBlob(oid, serial) self._server.sendBlob(oid, serial)
if self._have_blob(blob_filename, oid, serial): if self._have_blob(blob_filename, oid, serial):
...@@ -1049,6 +1049,9 @@ class ClientStorage(object): ...@@ -1049,6 +1049,9 @@ class ClientStorage(object):
except OSError: except OSError:
pass pass
def temporaryDirectory(self):
return self.blob_dir
def tpc_vote(self, txn): def tpc_vote(self, txn):
"""Storage API: vote on a transaction.""" """Storage API: vote on a transaction."""
if txn is not self._transaction: if txn is not self._transaction:
......
...@@ -523,6 +523,10 @@ class CommonBlobTests: ...@@ -523,6 +523,10 @@ class CommonBlobTests:
filename = self._storage.loadBlob(oid, serial) filename = self._storage.loadBlob(oid, serial)
self.assertEquals(somedata, open(filename, 'rb').read()) self.assertEquals(somedata, open(filename, 'rb').read())
def checkTemporaryDirectory(self):
self.assertEquals(self.blob_cache_dir,
self._storage.temporaryDirectory())
class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests): class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage.""" """ZEO backed by a BlobStorage-adapted FileStorage."""
......
...@@ -1116,7 +1116,7 @@ class Connection(ExportImport, object): ...@@ -1116,7 +1116,7 @@ class Connection(ExportImport, object):
data, serial = src.load(oid, src) data, serial = src.load(oid, src)
try: try:
blobfilename = src.loadBlob(oid, serial) blobfilename = src.loadBlob(oid, serial)
except POSKeyError: except (POSKeyError, Unsupported):
s = self._storage.store(oid, serial, data, s = self._storage.store(oid, serial, data,
self._version, transaction) self._version, transaction)
else: else:
...@@ -1195,10 +1195,6 @@ class TmpStore: ...@@ -1195,10 +1195,6 @@ class TmpStore:
self.versionEmpty = storage.versionEmpty self.versionEmpty = storage.versionEmpty
self._base_version = base_version self._base_version = base_version
tmpdir = os.environ.get('ZODB_BLOB_TEMPDIR')
if tmpdir is None:
tmpdir = tempfile.mkdtemp()
self._blobdir = tmpdir
self._file = tempfile.TemporaryFile() self._file = tempfile.TemporaryFile()
# position: current file position # position: current file position
# _tpos: file position at last commit point # _tpos: file position at last commit point
...@@ -1212,7 +1208,6 @@ class TmpStore: ...@@ -1212,7 +1208,6 @@ class TmpStore:
def close(self): def close(self):
self._file.close() self._file.close()
shutil.rmtree(self._blobdir)
def load(self, oid, version): def load(self, oid, version):
pos = self.index.get(oid) pos = self.index.get(oid)
...@@ -1260,13 +1255,17 @@ class TmpStore: ...@@ -1260,13 +1255,17 @@ class TmpStore:
def loadBlob(self, oid, serial): def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found. """Return the filename where the blob file can be found.
""" """
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Blobs are not supported by the underlying storage %r." %
self._storage)
filename = self._getCleanFilename(oid, serial) filename = self._getCleanFilename(oid, serial)
if not os.path.exists(filename): if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob." raise POSKeyError, "Not an existing blob."
return filename return filename
def _getBlobPath(self, oid): def _getBlobPath(self, oid):
return os.path.join(self._blobdir, return os.path.join(self.temporaryDirectory(),
utils.oid_repr(oid) utils.oid_repr(oid)
) )
...@@ -1275,6 +1274,10 @@ class TmpStore: ...@@ -1275,6 +1274,10 @@ class TmpStore:
"%s%s" % (utils.tid_repr(tid), "%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,) BLOB_SUFFIX,)
) )
def temporaryDirectory(self):
return self._storage.temporaryDirectory()
def reset(self, position, index): def reset(self, position, index):
self._file.truncate(position) self._file.truncate(position)
self.position = position self.position = position
...@@ -1288,4 +1291,3 @@ class TmpStore: ...@@ -1288,4 +1291,3 @@ class TmpStore:
# a copy of the index here. An alternative would be to ensure that # a copy of the index here. An alternative would be to ensure that
# all callers pass copies. As is, our callers do not make copies. # all callers pass copies. As is, our callers do not make copies.
self.index = index.copy() self.index = index.copy()
...@@ -257,7 +257,7 @@ class DB(object): ...@@ -257,7 +257,7 @@ class DB(object):
storage.registerDB(self) storage.registerDB(self)
except TypeError: except TypeError:
storage.registerDB(self, None) # Backward compat storage.registerDB(self, None) # Backward compat
if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly(): if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
warnings.warn( warnings.warn(
"Storage doesn't have a tpc_vote and this violates " "Storage doesn't have a tpc_vote and this violates "
......
...@@ -64,7 +64,7 @@ class Blob(persistent.Persistent): ...@@ -64,7 +64,7 @@ class Blob(persistent.Persistent):
# atomically # atomically
self.readers = [] self.readers = []
self.writers = [] self.writers = []
__init__ = __setstate__ __init__ = __setstate__
def __getstate__(self): def __getstate__(self):
...@@ -90,17 +90,15 @@ class Blob(persistent.Persistent): ...@@ -90,17 +90,15 @@ class Blob(persistent.Persistent):
and os.path.exists(self._p_blob_uncommitted) and os.path.exists(self._p_blob_uncommitted)
): ):
os.remove(self._p_blob_uncommitted) os.remove(self._p_blob_uncommitted)
super(Blob, self)._p_invalidate() super(Blob, self)._p_invalidate()
def opened(self): def opened(self):
return bool(self.readers or self.writers) return bool(self.readers or self.writers)
def closed(self, f): def closed(self, f):
# We use try/except below because another thread might remove # We use try/except below because another thread might remove
# the ref after we check it if the file is GCed. # the ref after we check it if the file is GCed.
for file_refs in (self.readers, self.writers): for file_refs in (self.readers, self.writers):
for ref in file_refs: for ref in file_refs:
if ref() is f: if ref() is f:
...@@ -128,7 +126,7 @@ class Blob(persistent.Persistent): ...@@ -128,7 +126,7 @@ class Blob(persistent.Persistent):
readers.remove(ref) readers.remove(ref)
except ValueError: except ValueError:
pass pass
self.readers.append(weakref.ref(result, destroyed)) self.readers.append(weakref.ref(result, destroyed))
else: else:
if self.readers: if self.readers:
...@@ -155,7 +153,7 @@ class Blob(persistent.Persistent): ...@@ -155,7 +153,7 @@ class Blob(persistent.Persistent):
writers.remove(ref) writers.remove(ref)
except ValueError: except ValueError:
pass pass
self.writers.append(weakref.ref(result, destroyed)) self.writers.append(weakref.ref(result, destroyed))
self._p_changed = True self._p_changed = True
...@@ -233,7 +231,10 @@ class Blob(persistent.Persistent): ...@@ -233,7 +231,10 @@ class Blob(persistent.Persistent):
def _create_uncommitted_file(self): def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, ( assert self._p_blob_uncommitted is None, (
"Uncommitted file already exists.") "Uncommitted file already exists.")
tempdir = os.environ.get('ZODB_BLOB_TEMPDIR', tempfile.gettempdir()) if self._p_jar:
tempdir = self._p_jar.db()._storage.temporaryDirectory()
else:
tempdir = tempfile.gettempdir()
self._p_blob_uncommitted = utils.mktemp(dir=tempdir) self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted return self._p_blob_uncommitted
......
...@@ -15,66 +15,66 @@ ...@@ -15,66 +15,66 @@
ZODB Blob support ZODB Blob support
================= =================
You create a blob like this: You create a blob like this::
>>> from ZODB.blob import Blob >>> from ZODB.blob import Blob
>>> myblob = Blob() >>> myblob = Blob()
A blob implements the IBlob interface: A blob implements the IBlob interface::
>>> from ZODB.interfaces import IBlob >>> from ZODB.interfaces import IBlob
>>> IBlob.providedBy(myblob) >>> IBlob.providedBy(myblob)
True True
We can open a new blob file for reading, but it won't have any data: We can open a new blob file for reading, but it won't have any data::
>>> myblob.open("r").read() >>> myblob.open("r").read()
'' ''
But we can write data to a new Blob by opening it for writing: But we can write data to a new Blob by opening it for writing::
>>> f = myblob.open("w") >>> f = myblob.open("w")
>>> f.write("Hi, Blob!") >>> f.write("Hi, Blob!")
If we try to open a Blob again while it is open for writing, we get an error: If we try to open a Blob again while it is open for writing, we get an error::
>>> myblob.open("r") >>> myblob.open("r")
Traceback (most recent call last): Traceback (most recent call last):
... ...
BlobError: Already opened for writing. BlobError: Already opened for writing.
We can close the file: We can close the file::
>>> f.close() >>> f.close()
Now we can open it for reading: Now we can open it for reading::
>>> f2 = myblob.open("r") >>> f2 = myblob.open("r")
And we get the data back: And we get the data back::
>>> f2.read() >>> f2.read()
'Hi, Blob!' 'Hi, Blob!'
If we want to, we can open it again: If we want to, we can open it again::
>>> f3 = myblob.open("r") >>> f3 = myblob.open("r")
>>> f3.read() >>> f3.read()
'Hi, Blob!' 'Hi, Blob!'
But we can't open it for writing, while it is opened for reading: But we can't open it for writing, while it is opened for reading::
>>> myblob.open("a") >>> myblob.open("a")
Traceback (most recent call last): Traceback (most recent call last):
... ...
BlobError: Already opened for reading. BlobError: Already opened for reading.
Before we can write, we have to close the readers: Before we can write, we have to close the readers::
>>> f2.close() >>> f2.close()
>>> f3.close() >>> f3.close()
Now we can open it for writing again and e.g. append data: Now we can open it for writing again and e.g. append data::
>>> f4 = myblob.open("a") >>> f4 = myblob.open("a")
>>> f4.write("\nBlob is fine.") >>> f4.write("\nBlob is fine.")
...@@ -93,7 +93,7 @@ We can't open a blob while it is open for writing: ...@@ -93,7 +93,7 @@ We can't open a blob while it is open for writing:
>>> f4.close() >>> f4.close()
Now we can read it: Now we can read it::
>>> f4a = myblob.open("r") >>> f4a = myblob.open("r")
>>> f4a.read() >>> f4a.read()
...@@ -103,14 +103,14 @@ Now we can read it: ...@@ -103,14 +103,14 @@ Now we can read it:
You shouldn't need to explicitly close a blob unless you hold a reference You shouldn't need to explicitly close a blob unless you hold a reference
to it via a name. If the first line in the following test kept a reference to it via a name. If the first line in the following test kept a reference
around via a name, the second call to open it in a writable mode would fail around via a name, the second call to open it in a writable mode would fail
with a BlobError, but it doesn't. with a BlobError, but it doesn't::
>>> myblob.open("r+").read() >>> myblob.open("r+").read()
'Hi, Blob!\nBlob is fine.' 'Hi, Blob!\nBlob is fine.'
>>> f4b = myblob.open("a") >>> f4b = myblob.open("a")
>>> f4b.close() >>> f4b.close()
We can read lines out of the blob too: We can read lines out of the blob too::
>>> f5 = myblob.open("r") >>> f5 = myblob.open("r")
>>> f5.readline() >>> f5.readline()
...@@ -119,7 +119,7 @@ We can read lines out of the blob too: ...@@ -119,7 +119,7 @@ We can read lines out of the blob too:
'Blob is fine.' 'Blob is fine.'
>>> f5.close() >>> f5.close()
We can seek to certain positions in a blob and read portions of it: We can seek to certain positions in a blob and read portions of it::
>>> f6 = myblob.open('r') >>> f6 = myblob.open('r')
>>> f6.seek(4) >>> f6.seek(4)
...@@ -129,7 +129,7 @@ We can seek to certain positions in a blob and read portions of it: ...@@ -129,7 +129,7 @@ We can seek to certain positions in a blob and read portions of it:
'Blob!' 'Blob!'
>>> f6.close() >>> f6.close()
We can use the object returned by a blob open call as an iterable: We can use the object returned by a blob open call as an iterable::
>>> f7 = myblob.open('r') >>> f7 = myblob.open('r')
>>> for line in f7: >>> for line in f7:
...@@ -139,7 +139,7 @@ We can use the object returned by a blob open call as an iterable: ...@@ -139,7 +139,7 @@ We can use the object returned by a blob open call as an iterable:
Blob is fine. Blob is fine.
>>> f7.close() >>> f7.close()
We can truncate a blob: We can truncate a blob::
>>> f8 = myblob.open('a') >>> f8 = myblob.open('a')
>>> f8.truncate(0) >>> f8.truncate(0)
...@@ -149,30 +149,14 @@ We can truncate a blob: ...@@ -149,30 +149,14 @@ We can truncate a blob:
'' ''
>>> f8.close() >>> f8.close()
Blobs are always opened in binary mode: Blobs are always opened in binary mode::
>>> f9 = myblob.open("r") >>> f9 = myblob.open("r")
>>> f9.mode >>> f9.mode
'rb' 'rb'
>>> f9.close() >>> f9.close()
We can specify the tempdir that blobs use to keep uncommitted data by Some cleanup in this test is needed::
modifying the ZODB_BLOB_TEMPDIR environment variable:
>>> import os, tempfile, shutil
>>> tempdir = tempfile.mkdtemp()
>>> os.environ['ZODB_BLOB_TEMPDIR'] = tempdir
>>> myblob = Blob()
>>> len(os.listdir(tempdir))
0
>>> f = myblob.open('w')
>>> len(os.listdir(tempdir))
1
>>> f.close()
>>> shutil.rmtree(tempdir)
>>> del os.environ['ZODB_BLOB_TEMPDIR']
Some cleanup in this test is needed:
>>> import transaction >>> import transaction
>>> transaction.get().abort() >>> transaction.get().abort()
...@@ -35,7 +35,7 @@ We also need a database with a blob supporting storage: ...@@ -35,7 +35,7 @@ We also need a database with a blob supporting storage:
>>> blob_dir = mkdtemp() >>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage) >>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage) >>> database = DB(blob_storage)
Putting a Blob into a Connection works like every other object: Putting a Blob into a Connection works like every other object:
>>> connection = database.open() >>> connection = database.open()
......
=======================================
Temporary directory handling with blobs
=======================================
When creating uncommitted data files for a blob (e.g. by calling
`blob.open('w')`) we need to decide where to create them. The decision depends
on whether the blob is already stored in a database or not.
Case 1: Blobs that are not in a database yet
============================================
Let's create a new blob and open it for writing::
>>> from ZODB.blob import Blob
>>> b = Blob()
>>> w = b.open('w')
The created file is in the default temporary directory::
>>> import tempfile
>>> w.name.startswith(tempfile.gettempdir())
True
Case 2: Blobs that are in a database
====================================
For this case we instanciate a blob and add it to a database immediately.
First, we need a datatabase with blob support::
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.blob import BlobStorage
>>> from ZODB.DB import DB
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Now we create a blob and put it in the database. After that we open it for
writing and expect the file to be in the blob directory::
>>> blob = Blob()
>>> connection = database.open()
>>> connection.add(blob)
>>> w = blob.open('w')
>>> w.name.startswith(blob_dir)
True
...@@ -271,6 +271,7 @@ def test_suite(): ...@@ -271,6 +271,7 @@ def test_suite():
suite.addTest(doctest.DocFileSuite( suite.addTest(doctest.DocFileSuite(
"blob_basic.txt", "blob_connection.txt", "blob_transaction.txt", "blob_basic.txt", "blob_connection.txt", "blob_transaction.txt",
"blob_packing.txt", "blob_importexport.txt", "blob_consume.txt", "blob_packing.txt", "blob_importexport.txt", "blob_consume.txt",
"blob_tempdir.txt",
setUp=ZODB.tests.util.setUp, setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown, tearDown=ZODB.tests.util.tearDown,
)) ))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment