Commit b899cc0d authored by Jim Fulton's avatar Jim Fulton

Fixed a bug in the logic to reduce the blob cache size.

Changed the default blob-cache-size-check size to 10%.

Changed the algorithm for deciding the target for blob cache
reduction. Now the target is

blob-cache-size * (100 - blob-cache-size-check)/100
parent 6adf7076
......@@ -122,7 +122,7 @@ class ClientStorage(object):
drop_cache_rather_verify=False,
username='', password='', realm=None,
blob_dir=None, shared_blob_dir=False,
blob_cache_size=None, blob_cache_size_check=100,
blob_cache_size=None, blob_cache_size_check=10,
):
"""ClientStorage constructor.
......@@ -231,7 +231,7 @@ class ClientStorage(object):
blob_cache_size_check
ZEO check size as percent of blob_cache_size. The ZEO
cache size will be checked when this many bytes have been
loaded into the cache. Defaults to 100% of the blob cache
loaded into the cache. Defaults to 10% of the blob cache
size. This option is ignored if shared_blob_dir is true.
Note that the authentication protocol is defined by the server
......@@ -472,6 +472,9 @@ class ClientStorage(object):
return
self._blob_data_bytes_loaded = 0
target = max(self._blob_cache_size - self._blob_cache_size_check, 0)
check_blob_size_thread = threading.Thread(
target=_check_blob_cache_size,
args=(self.blob_dir, self._blob_cache_size),
......@@ -1610,9 +1613,13 @@ def _accessed(filename):
cache_file_name = re.compile(r'\d+$').match
def _check_blob_cache_size(blob_dir, target):
logger = logging.getLogger(__name__+'.check_blob_cache')
logger.info("Checking blob cache size")
layout = open(os.path.join(blob_dir, ZODB.blob.LAYOUT_MARKER)
).read().strip()
if not layout == 'zeocache':
logger.critical("Invalid blob directory layout %s", layout)
raise ValueError("Invalid blob directory layout", layout)
try:
......@@ -1620,51 +1627,59 @@ def _check_blob_cache_size(blob_dir, target):
os.path.join(blob_dir, 'check_size.lock'))
except zc.lockfile.LockError:
# Someone is already cleaning up, so don't bother
logger.info("Another thread is checking the blob cache size")
return
try:
size = 0
blob_suffix = ZODB.blob.BLOB_SUFFIX
files_by_atime = BTrees.IOBTree.BTree()
for dirname in os.listdir(blob_dir):
if not cache_file_name(dirname):
continue
base = os.path.join(blob_dir, dirname)
if not os.path.isdir(base):
continue
for file_name in os.listdir(base):
if not file_name.endswith(blob_suffix):
continue
file_name = os.path.join(base, file_name)
if not os.path.isfile(file_name):
continue
stat = os.stat(file_name)
size += stat.st_size
t = int(stat.st_atime)
if t not in files_by_atime:
files_by_atime[t] = []
files_by_atime[t].append(file_name)
while size > target and files_by_atime:
for file_name in files_by_atime.pop(files_by_atime.minKey()):
lockfilename = os.path.join(os.path.dirname(file_name),
'.lock')
try:
lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError:
continue # In use, skip
try:
size = os.stat(file_name).st_size
try:
ZODB.blob.remove_committed(file_name)
except OSError, v:
pass # probably open on windows
else:
size -= size
finally:
lock.close()
size = 0
blob_suffix = ZODB.blob.BLOB_SUFFIX
files_by_atime = BTrees.IOBTree.BTree()
for dirname in os.listdir(blob_dir):
if not cache_file_name(dirname):
continue
base = os.path.join(blob_dir, dirname)
if not os.path.isdir(base):
continue
for file_name in os.listdir(base):
if not file_name.endswith(blob_suffix):
continue
file_name = os.path.join(base, file_name)
if not os.path.isfile(file_name):
continue
stat = os.stat(file_name)
size += stat.st_size
t = int(stat.st_atime)
if t not in files_by_atime:
files_by_atime[t] = []
files_by_atime[t].append(file_name)
logger.info("blob cache size: %s", size)
while size > target and files_by_atime:
for file_name in files_by_atime.pop(files_by_atime.minKey()):
lockfilename = os.path.join(os.path.dirname(file_name),
'.lock')
try:
lock = zc.lockfile.LockFile(lockfilename)
except zc.lockfile.LockError:
logger.info("Skipping locked %s",
os.path.basename(file_name))
continue # In use, skip
try:
fsize = os.stat(file_name).st_size
try:
ZODB.blob.remove_committed(file_name)
except OSError, v:
pass # probably open on windows
else:
size -= fsize
finally:
lock.close()
logger.info("reduced blob cache size: %s", size)
finally:
check_lock.close()
......
......@@ -22,8 +22,7 @@ Let's start by setting up some data:
We'll also create a client.
>>> import ZEO
>>> db = ZEO.DB(addr, blob_dir='blobs',
... blob_cache_size=4000, blob_cache_size_check=10)
>>> db = ZEO.DB(addr, blob_dir='blobs', blob_cache_size=4000)
Here, we passed a blob_cache_size parameter, which specifies a target
blob cache size. This is not a hard limit, but rather a target. It
......@@ -66,7 +65,7 @@ directory.
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
>>> cache_size('blobs') < 5000
True
If we read all of the blobs, data will be downloaded again, as
......@@ -80,7 +79,7 @@ target:
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
>>> cache_size('blobs') < 5000
True
>>> for i in range(1, 101):
......@@ -97,7 +96,7 @@ target:
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
>>> cache_size('blobs') < 5000
True
>>> for i in range(1, 101):
......@@ -107,7 +106,7 @@ target:
>>> db.storage._check_blob_size_thread.join()
>>> cache_size('blobs') < 6000
>>> cache_size('blobs') < 5000
True
Now let see if we can stress things a bit. We'll create many clients
......@@ -116,8 +115,7 @@ provoke problems:
>>> import threading, random
>>> def run():
... db = ZEO.DB(addr, blob_dir='blobs',
... blob_cache_size=4000, blob_cache_size_check=10)
... db = ZEO.DB(addr, blob_dir='blobs', blob_cache_size=4000)
... conn = db.open()
... for i in range(300):
... time.sleep(0)
......@@ -140,7 +138,7 @@ provoke problems:
>>> for thread in threads:
... thread.join()
>>> cache_size('blobs') < 6000
>>> cache_size('blobs') < 5000
True
.. cleanup
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment