Commit 9afaebbc authored by Christian Theune's avatar Christian Theune

- Removed usage of special environment variable to tell blobs where to create

   new uncommitted data.
 - Made blobs use the temporaryDirectory() method of storages to store new
   uncommitted data near the committed data.
parent 6bb9f884
......@@ -324,7 +324,7 @@ class ClientStorage(object):
if blob_dir is not None:
# Avoid doing this import unless we need it, as it
# currently requires pywin32 on Windows.
import ZODB.blob
import ZODB.blob
self.fshelper = ZODB.blob.FilesystemHelper(blob_dir)
self.fshelper.create()
self.fshelper.checkSecure()
......@@ -955,11 +955,11 @@ class ClientStorage(object):
f = open(blob_filename, 'ab')
f.write(chunk)
f.close()
def recieveBlobStop(self, oid, serial):
blob_filename = self.fshelper.getBlobFilename(oid, serial)
os.rename(blob_filename+'.dl', blob_filename)
def loadBlob(self, oid, serial):
# Load a blob. If it isn't present and we have a shared blob
......@@ -1017,7 +1017,7 @@ class ClientStorage(object):
except OSError:
pass
break
if self._have_blob(blob_filename, oid, serial):
return blob_filename
......@@ -1034,7 +1034,7 @@ class ClientStorage(object):
# Ask the server to send it to us. When this function
# returns, it will have been sent. (The recieving will
# have been handled by the asyncore thread.)
self._server.sendBlob(oid, serial)
if self._have_blob(blob_filename, oid, serial):
......@@ -1049,6 +1049,9 @@ class ClientStorage(object):
except OSError:
pass
def temporaryDirectory(self):
return self.blob_dir
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
if txn is not self._transaction:
......
......@@ -523,6 +523,10 @@ class CommonBlobTests:
filename = self._storage.loadBlob(oid, serial)
self.assertEquals(somedata, open(filename, 'rb').read())
def checkTemporaryDirectory(self):
self.assertEquals(self.blob_cache_dir,
self._storage.temporaryDirectory())
class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
......
......@@ -1116,7 +1116,7 @@ class Connection(ExportImport, object):
data, serial = src.load(oid, src)
try:
blobfilename = src.loadBlob(oid, serial)
except POSKeyError:
except (POSKeyError, Unsupported):
s = self._storage.store(oid, serial, data,
self._version, transaction)
else:
......@@ -1195,10 +1195,6 @@ class TmpStore:
self.versionEmpty = storage.versionEmpty
self._base_version = base_version
tmpdir = os.environ.get('ZODB_BLOB_TEMPDIR')
if tmpdir is None:
tmpdir = tempfile.mkdtemp()
self._blobdir = tmpdir
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
......@@ -1212,7 +1208,6 @@ class TmpStore:
def close(self):
self._file.close()
shutil.rmtree(self._blobdir)
def load(self, oid, version):
pos = self.index.get(oid)
......@@ -1260,13 +1255,17 @@ class TmpStore:
def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found.
"""
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Blobs are not supported by the underlying storage %r." %
self._storage)
filename = self._getCleanFilename(oid, serial)
if not os.path.exists(filename):
raise POSKeyError, "Not an existing blob."
return filename
def _getBlobPath(self, oid):
return os.path.join(self._blobdir,
return os.path.join(self.temporaryDirectory(),
utils.oid_repr(oid)
)
......@@ -1275,6 +1274,10 @@ class TmpStore:
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def temporaryDirectory(self):
return self._storage.temporaryDirectory()
def reset(self, position, index):
self._file.truncate(position)
self.position = position
......@@ -1288,4 +1291,3 @@ class TmpStore:
# a copy of the index here. An alternative would be to ensure that
# all callers pass copies. As is, our callers do not make copies.
self.index = index.copy()
......@@ -257,7 +257,7 @@ class DB(object):
storage.registerDB(self)
except TypeError:
storage.registerDB(self, None) # Backward compat
if (not hasattr(storage, 'tpc_vote')) and not storage.isReadOnly():
warnings.warn(
"Storage doesn't have a tpc_vote and this violates "
......
......@@ -64,7 +64,7 @@ class Blob(persistent.Persistent):
# atomically
self.readers = []
self.writers = []
__init__ = __setstate__
def __getstate__(self):
......@@ -90,17 +90,15 @@ class Blob(persistent.Persistent):
and os.path.exists(self._p_blob_uncommitted)
):
os.remove(self._p_blob_uncommitted)
super(Blob, self)._p_invalidate()
def opened(self):
return bool(self.readers or self.writers)
def closed(self, f):
# We use try/except below because another thread might remove
# the ref after we check it if the file is GCed.
for file_refs in (self.readers, self.writers):
for ref in file_refs:
if ref() is f:
......@@ -128,7 +126,7 @@ class Blob(persistent.Persistent):
readers.remove(ref)
except ValueError:
pass
self.readers.append(weakref.ref(result, destroyed))
else:
if self.readers:
......@@ -155,7 +153,7 @@ class Blob(persistent.Persistent):
writers.remove(ref)
except ValueError:
pass
self.writers.append(weakref.ref(result, destroyed))
self._p_changed = True
......@@ -233,7 +231,10 @@ class Blob(persistent.Persistent):
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, (
"Uncommitted file already exists.")
tempdir = os.environ.get('ZODB_BLOB_TEMPDIR', tempfile.gettempdir())
if self._p_jar:
tempdir = self._p_jar.db()._storage.temporaryDirectory()
else:
tempdir = tempfile.gettempdir()
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted
......
......@@ -15,66 +15,66 @@
ZODB Blob support
=================
You create a blob like this:
You create a blob like this::
>>> from ZODB.blob import Blob
>>> myblob = Blob()
A blob implements the IBlob interface:
A blob implements the IBlob interface::
>>> from ZODB.interfaces import IBlob
>>> IBlob.providedBy(myblob)
True
We can open a new blob file for reading, but it won't have any data:
We can open a new blob file for reading, but it won't have any data::
>>> myblob.open("r").read()
''
But we can write data to a new Blob by opening it for writing:
But we can write data to a new Blob by opening it for writing::
>>> f = myblob.open("w")
>>> f.write("Hi, Blob!")
If we try to open a Blob again while it is open for writing, we get an error:
If we try to open a Blob again while it is open for writing, we get an error::
>>> myblob.open("r")
Traceback (most recent call last):
...
BlobError: Already opened for writing.
We can close the file:
We can close the file::
>>> f.close()
Now we can open it for reading:
Now we can open it for reading::
>>> f2 = myblob.open("r")
And we get the data back:
And we get the data back::
>>> f2.read()
'Hi, Blob!'
If we want to, we can open it again:
If we want to, we can open it again::
>>> f3 = myblob.open("r")
>>> f3.read()
'Hi, Blob!'
But we can't open it for writing, while it is opened for reading:
But we can't open it for writing, while it is opened for reading::
>>> myblob.open("a")
Traceback (most recent call last):
...
BlobError: Already opened for reading.
Before we can write, we have to close the readers:
Before we can write, we have to close the readers::
>>> f2.close()
>>> f3.close()
Now we can open it for writing again and e.g. append data:
Now we can open it for writing again and e.g. append data::
>>> f4 = myblob.open("a")
>>> f4.write("\nBlob is fine.")
......@@ -93,7 +93,7 @@ We can't open a blob while it is open for writing:
>>> f4.close()
Now we can read it:
Now we can read it::
>>> f4a = myblob.open("r")
>>> f4a.read()
......@@ -103,14 +103,14 @@ Now we can read it:
You shouldn't need to explicitly close a blob unless you hold a reference
to it via a name. If the first line in the following test kept a reference
around via a name, the second call to open it in a writable mode would fail
with a BlobError, but it doesn't.
with a BlobError, but it doesn't::
>>> myblob.open("r+").read()
'Hi, Blob!\nBlob is fine.'
>>> f4b = myblob.open("a")
>>> f4b.close()
We can read lines out of the blob too:
We can read lines out of the blob too::
>>> f5 = myblob.open("r")
>>> f5.readline()
......@@ -119,7 +119,7 @@ We can read lines out of the blob too:
'Blob is fine.'
>>> f5.close()
We can seek to certain positions in a blob and read portions of it:
We can seek to certain positions in a blob and read portions of it::
>>> f6 = myblob.open('r')
>>> f6.seek(4)
......@@ -129,7 +129,7 @@ We can seek to certain positions in a blob and read portions of it:
'Blob!'
>>> f6.close()
We can use the object returned by a blob open call as an iterable:
We can use the object returned by a blob open call as an iterable::
>>> f7 = myblob.open('r')
>>> for line in f7:
......@@ -139,7 +139,7 @@ We can use the object returned by a blob open call as an iterable:
Blob is fine.
>>> f7.close()
We can truncate a blob:
We can truncate a blob::
>>> f8 = myblob.open('a')
>>> f8.truncate(0)
......@@ -149,30 +149,14 @@ We can truncate a blob:
''
>>> f8.close()
Blobs are always opened in binary mode:
Blobs are always opened in binary mode::
>>> f9 = myblob.open("r")
>>> f9.mode
'rb'
>>> f9.close()
We can specify the tempdir that blobs use to keep uncommitted data by
modifying the ZODB_BLOB_TEMPDIR environment variable:
>>> import os, tempfile, shutil
>>> tempdir = tempfile.mkdtemp()
>>> os.environ['ZODB_BLOB_TEMPDIR'] = tempdir
>>> myblob = Blob()
>>> len(os.listdir(tempdir))
0
>>> f = myblob.open('w')
>>> len(os.listdir(tempdir))
1
>>> f.close()
>>> shutil.rmtree(tempdir)
>>> del os.environ['ZODB_BLOB_TEMPDIR']
Some cleanup in this test is needed:
Some cleanup in this test is needed::
>>> import transaction
>>> transaction.get().abort()
......@@ -35,7 +35,7 @@ We also need a database with a blob supporting storage:
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Putting a Blob into a Connection works like every other object:
>>> connection = database.open()
......
=======================================
Temporary directory handling with blobs
=======================================
When creating uncommitted data files for a blob (e.g. by calling
`blob.open('w')`) we need to decide where to create them. The decision depends
on whether the blob is already stored in a database or not.
Case 1: Blobs that are not in a database yet
============================================
Let's create a new blob and open it for writing::
>>> from ZODB.blob import Blob
>>> b = Blob()
>>> w = b.open('w')
The created file is in the default temporary directory::
>>> import tempfile
>>> w.name.startswith(tempfile.gettempdir())
True
Case 2: Blobs that are in a database
====================================
For this case we instanciate a blob and add it to a database immediately.
First, we need a datatabase with blob support::
>>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.blob import BlobStorage
>>> from ZODB.DB import DB
>>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(blob_storage)
Now we create a blob and put it in the database. After that we open it for
writing and expect the file to be in the blob directory::
>>> blob = Blob()
>>> connection = database.open()
>>> connection.add(blob)
>>> w = blob.open('w')
>>> w.name.startswith(blob_dir)
True
......@@ -271,6 +271,7 @@ def test_suite():
suite.addTest(doctest.DocFileSuite(
"blob_basic.txt", "blob_connection.txt", "blob_transaction.txt",
"blob_packing.txt", "blob_importexport.txt", "blob_consume.txt",
"blob_tempdir.txt",
setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown,
))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment