Commit 4988f141 authored by Christian Theune's avatar Christian Theune

- Fixed load_lock strategy and testing it roughly

- Made the ClientStorage cache directory the same structure as BlobStorage
parent 7ec117d6
......@@ -315,7 +315,8 @@ class ClientStorage(object):
self._lock = threading.Lock()
# XXX need to check for POSIX-ness here
if (os.stat(blob_dir).st_mode & 077) != 0:
if blob_dir is not None and \
(os.stat(blob_dir).st_mode & 077) != 0:
log2('Blob dir %s has insecure mode setting' % blob_dir,
level=logging.WARNING)
......@@ -923,24 +924,32 @@ class ClientStorage(object):
os.unlink(blobfilename)
return serials
def _getBlobPath(self, oid):
return os.path.join(self.blob_dir,
utils.oid_repr(oid)
)
def _getDirtyFilename(self, oid, serial):
"""Generate an intermediate filename for two-phase commit.
"""
return self._getCleanFilename(oid, serial) + "." + BLOB_DIRTY
def _getCleanFilename(self, oid, tid):
return os.path.join(self.blob_dir,
"%s-%s%s" % (utils.oid_repr(oid),
utils.tid_repr(tid),
return os.path.join(self._getBlobPath(oid),
"%s%s" % (utils.tid_repr(tid),
BLOB_SUFFIX,)
)
def _do_load_blob(self, oid, serial):
def _do_load_blob(self, oid, serial, version):
"""Do the actual loading from the RPC server."""
blob_filename = self._getCleanFilename(oid, serial)
if self._server is None:
raise ClientDisconnected()
targetpath = self._getBlobPath(oid)
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
# We write to a temporary file first, so we do not accidentally
# allow half-baked copies of this blob be loaded
tempfilename = self._getDirtyFilename(oid, serial)
......@@ -1002,7 +1011,7 @@ class ClientStorage(object):
return blob_filename
# Otherwise we download and use that
return self._do_load_blob(oid, serial)
return self._do_load_blob(oid, serial, version)
finally:
# When done we remove the download lock ...
lock.release()
......
......@@ -21,6 +21,7 @@ import asyncore
import tempfile
import unittest
import logging
import shutil
# ZODB test support
import ZODB
......@@ -127,14 +128,16 @@ class GenericTests(
self._pids = [pid]
self._servers = [adminaddr]
self._conf_path = path
self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
self._storage = ClientStorage(zport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1,
wait_timeout=60)
wait_timeout=60, blob_dir=self.blob_cache_dir)
self._storage.registerDB(DummyDB(), None)
def tearDown(self):
self._storage.close()
os.remove(self._conf_path)
shutil.rmtree(self.blob_cache_dir)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
......@@ -199,12 +202,13 @@ class MappingStorageTests(GenericTests):
class BlobAdaptedFileStorageTests(GenericTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
self.blobdir = tempfile.mkdtemp()
self.blobdir = tempfile.mkdtemp() # This is the blob directory on the ZEO server
self.filestorage = tempfile.mktemp()
super(BlobAdaptedFileStorageTests, self).setUp()
def tearDown(self):
import shutil
shutil.rmtree(self.blobdir)
os.unlink(self.filestorage)
super(BlobAdaptedFileStorageTests, self).tearDown()
def getConfig(self):
......@@ -215,7 +219,7 @@ class BlobAdaptedFileStorageTests(GenericTests):
path %s
</filestorage>
</blobstorage>
""" % (self.blobdir, tempfile.mktemp())
""" % (self.blobdir, self.filestorage)
def checkStoreBlob(self):
from ZODB.utils import oid_repr, tid_repr
......@@ -253,12 +257,33 @@ class BlobAdaptedFileStorageTests(GenericTests):
self.assertEqual(somedata, open(filename).read())
def checkLoadBlob(self):
from ZODB.Blobs.Blob import Blob
from ZODB.tests.StorageTestBase import zodb_pickle, ZERO, \
handle_serials
oid = self._storage.new_oid()
serial = ZERO
import transaction
version = ''
filename = self._storage._getCleanFilename(oid, serial)
somedata = 'a' * 10
blob = Blob()
bd_fh = blob.open('w')
bd_fh.write(somedata)
bd_fh.close()
tfname = bd_fh.name
oid = self._storage.new_oid()
data = zodb_pickle(blob)
t = transaction.Transaction()
try:
self._storage.tpc_begin(t)
r1 = self._storage.storeBlob(oid, ZERO, data, tfname, '', t)
r2 = self._storage.tpc_vote(t)
serial = handle_serials(oid, r1, r2)
self._storage.tpc_finish(t)
except:
self._storage.tpc_abort(t)
raise
class Dummy:
def __init__(self):
......@@ -276,17 +301,18 @@ class BlobAdaptedFileStorageTests(GenericTests):
def __setitem__(self, k, v):
self.added.append(k)
super(dict, self).__setitem__(k, v)
super(statusdict, self).__setitem__(k, v)
def __delitem__(self, k):
self.removed.append(k)
super(dict, self).__delitem__(k)
super(statusdict, self).__delitem__(k)
# ensure that we do locking properly
filename = self._storage._getCleanFilename(oid, serial)
thestatuslock = self._storage.blob_status_lock = Dummy()
thebloblock = Dummy()
def getBlobLock(self):
def getBlobLock():
return thebloblock
# override getBlobLock to test that locking is performed
......@@ -301,8 +327,8 @@ class BlobAdaptedFileStorageTests(GenericTests):
self.assertEqual(thebloblock.acquired, 1)
self.assertEqual(thebloblock.released, 1)
self.assertEqual(thestatusdict.added, (oid, serial))
self.assertEqual(thestatusdict.removed, (oid, serial))
self.assertEqual(thestatusdict.added, [(oid, serial)])
self.assertEqual(thestatusdict.removed, [(oid, serial)])
test_classes = [FileStorageTests, MappingStorageTests,
BlobAdaptedFileStorageTests]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment