Commit 081c502b authored by Julien Muchembled's avatar Julien Muchembled

client: new cache algorithm

parent c84c48ee
......@@ -15,26 +15,46 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import math
from bisect import insort
from BTrees.LOBTree import LOBTree
from gc import get_referents
from struct import Struct
from sys import getsizeof
s = Struct('d')
pack_double = s.pack
unpack_double = s.unpack
s = Struct('q')
pack_long = s.pack
unpack_long = s.unpack
del s
def internalSizeOfBTree(x):
module = type(x).__module__
seen = set()
left = [x]
size = 0
while left:
x = left.pop()
seen.add(x)
size += getsizeof(x)
left.extend(x for x in get_referents(x)
if type(x).__module__ == module and x not in seen)
return size
class CacheItem(object):
__slots__ = ('oid', 'tid', 'next_tid', 'data',
'counter', 'level', 'expire',
'prev', 'next')
__slots__ = 'oid', 'tid', 'next_tid', 'data', 'counter', 'expire'
def __repr__(self):
s = ''
for attr in self.__slots__:
try:
value = getattr(self, attr)
if value:
if attr in ('prev', 'next'):
s += ' %s=<...>' % attr
continue
elif attr == 'data':
value = '...'
if attr == 'data':
s += ' len(%s)=%s' % (attr, len(value))
continue
if attr == 'expire':
value = unpack_double(pack_long(value))[0]
s += ' %s=%r' % (attr, value)
except AttributeError:
pass
......@@ -44,261 +64,186 @@ class CacheItem(object):
return self.tid < other.tid
class ClientCache(object):
"""In-memory pickle cache based on Multi-Queue cache algorithm
"""In-memory pickle cache based on LFRU cache algorithm
Multi-Queue algorithm for Second Level Buffer Caches:
https://www.usenix.org/event/usenix01/full_papers/zhou/zhou_html/index.html
This Least Frequent Recently Used implementation is adapted to handle
records of different sizes. This is possible thanks to a B+Tree: the use
of such a complex structure for a cache is quite unusual for a cache
but we use a C implementation that's relatively fast compared to the
cost of a cache miss.
Quick description:
- There are multiple "regular" queues, plus a history queue
- The queue to store an object in depends on its access frequency
- The queue an object is in defines its lifespan (higher-index queue eq.
longer lifespan)
-> The more often an object is accessed, the higher lifespan it will
have
- Upon cache or history hit, object frequency is increased and object
might get moved to longer-lived queue
- Each access "ages" objects in cache, and an aging object is moved to
shorter-lived queue as it ages without being accessed, or in the
history queue if it's really too old.
- The history queue only contains items with counter > 0
This algorithm adapts well regardless its maximum allowed size,
without any tweak.
"""
__slots__ = ('max_size', '_life_time', '_max_history_size',
'_queue_list', '_oid_dict', '_time', '_size', '_history_size',
__slots__ = ('max_size', '_oid_dict', '_size', '_added', '_items',
'_nhit', '_nmiss')
def __init__(self, life_time=10000, max_history_size=100000,
max_size=20*1024*1024):
self._life_time = life_time
self._max_history_size = max_history_size
def __init__(self, max_size=20*1024*1024):
self.max_size = max_size
self.clear()
def clear(self):
"""Reset cache"""
self._queue_list = [None] # first is history
self._oid_dict = {}
self._time = 0
self._size = 0
self._history_size = 0
self._nhit = self._nmiss = 0
self._size = self._nhit = self._nmiss = 0
# Make sure to never produce negative keys, else
# we could not manipulate them when encoded as integers.
self._added = self.max_size
self._items = LOBTree()
def __repr__(self):
nload = self._nhit + self._nmiss
return ("<%s #loads=%s #oids=%s size=%s time=%s queue_length=%r"
" (life_time=%s max_history_size=%s max_size=%s)>") % (
return ("<%s #loads=%s #oids=%s size=%s #items=%s"
" btree_overhead=%s (max_size=%s)>") % (
self.__class__.__name__,
nload and '%s (%.3g%% hit)' % (nload, 100 * self._nhit / nload),
len(self._oid_dict), self._size, self._time,
[self._history_size] + [
sum(1 for _ in self._iterQueue(level))
for level in xrange(1, len(self._queue_list))],
self._life_time, self._max_history_size, self.max_size)
def _iterQueue(self, level):
"""for debugging purpose"""
if level < len(self._queue_list):
# Lockless iteration of the queue.
# XXX: In case of race condition, the result is wrong but at least,
# it won't loop endlessly. If one want to collect accurate
# statistics, a lock should be used.
expire = 0
item = self._queue_list[level]
while item and item.level == level and expire < item.expire:
yield item
expire = item.expire
item = item.next
def _remove_from_oid_dict(self, item):
item_list = self._oid_dict[item.oid]
item_list.remove(item)
if not item_list:
del self._oid_dict[item.oid]
def _add(self, item):
level = item.level
try:
head = self._queue_list[level]
except IndexError:
assert len(self._queue_list) == level
self._queue_list.append(item)
item.prev = item.next = item
else:
if head:
item.prev = tail = head.prev
tail.next = head.prev = item
item.next = head
else:
self._queue_list[level] = item
item.prev = item.next = item
if level:
item.expire = self._time + self._life_time
else:
self._empty(item)
self._history_size += 1
if self._max_history_size < self._history_size:
self._remove(head)
self._remove_from_oid_dict(head)
def _empty(self, item):
self._size -= len(item.data)
item.data = None
def _remove(self, item):
level = item.level
if level is not None:
if level:
item.level = level - 1
else:
self._history_size -= 1
next = item.next
if next is item:
self._queue_list[level] = next = None
else:
item.prev.next = next
next.prev = item.prev
if self._queue_list[level] is item:
self._queue_list[level] = next
return next
def _fetched(self, item, _log=math.log):
self._remove(item)
item.counter = counter = item.counter + 1
# XXX It might be better to adjust the level according to the object
# size. See commented factor for example.
item.level = 1 + int(_log(counter, 2)
# * (1.01 - len(item.data) / self.max_size)
)
self._add(item)
self._time = time = self._time + 1
for head in self._queue_list[1:]:
if head and head.expire < time:
self._remove(head)
if head.level or head.counter:
self._add(head)
else:
self._empty(head)
self._remove_from_oid_dict(head)
break
len(self._oid_dict), self._size, len(self._items),
internalSizeOfBTree(self._items),
self.max_size)
def _load(self, oid, before_tid=None):
item_list = self._oid_dict.get(oid)
if item_list:
if before_tid:
for item in reversed(item_list):
for item in item_list:
if item.tid < before_tid:
next_tid = item.next_tid
if next_tid and next_tid < before_tid:
break
return item
else:
item = item_list[-1]
item = item_list[0]
if not item.next_tid:
return item
def load(self, oid, before_tid=None):
def load(self, oid, before_tid):
"""Return a revision of oid that was current before given tid"""
item = self._load(oid, before_tid)
if item:
data = item.data
if data is not None:
self._nhit += 1
self._fetched(item)
return data, item.tid, item.next_tid
del self._items[item.expire]
item.counter += 1
self._add(item)
self._nhit += 1
return item.data, item.tid, item.next_tid
self._nmiss += 1
def _forget(self, item):
items = self._oid_dict[item.oid]
items.remove(item)
if not items:
del self._oid_dict[item.oid]
self._size -= len(item.data)
del self._items[item.expire]
def _add(self, item):
# The initial idea was to compute keys as follows:
# (added - size) * item.counter
# However, after running for a long time, this tends to degenerate:
# - size become more and more negligible over time
# - objects that are most often accessed become impossible to remove,
# making the cache too slow to adapt after a change of workload
# - 64 bits is not enough
# This was solved in several ways, by using the following formula:
# min_key - size + (added - min_key) * item.counter
# and doubles.
# BTrees does not have an optimized class for doubles so we encode
# them as integers, which preserve the same order as long as they're
# positive (hence some extra tweak to avoid negative numbers in some
# rare cases) and it becomes easier to compute the next double
# (+1 instead of libm.nextafter). The downside is that conversion
# between double and long is a bit expensive in Python.
added = self._added
items = self._items
try:
x = items.minKey()
except ValueError:
x = added
else:
# Most of the time, the smallest key is smaller than `added`. In
# the very rare case it isn't, make sure to produce a positive key.
x = min(added, unpack_double(pack_long(x))[0])
size = len(item.data)
expire = unpack_long(pack_double(
x - size + (added - x) * item.counter
))[0]
for x in items.iterkeys(expire):
if x != expire:
break
expire += 1
self._added = added + size
item.expire = expire
items[expire] = item
def store(self, oid, data, tid, next_tid):
"""Store a new data record in the cache"""
size = len(data)
max_size = self.max_size
if size < max_size:
item = self._load(oid, next_tid)
if item:
# We don't handle late invalidations for cached oids, because
# the caller is not supposed to explicitly asks for tids after
# app.last_tid (and the cache should be empty when app.last_tid
# is still None).
assert item.tid == tid, (item, tid)
if item.level: # already stored
assert item.next_tid == next_tid and item.data == data
return
assert not item.data
# Possible case of late invalidation.
item.next_tid = next_tid
i = 0
try:
items = self._oid_dict[oid]
except KeyError:
items = self._oid_dict[oid] = []
counter = 1
else:
item = CacheItem()
item.oid = oid
item.tid = tid
item.next_tid = next_tid
item.counter = 0
item.level = None
try:
item_list = self._oid_dict[oid]
except KeyError:
self._oid_dict[oid] = [item]
for item in items:
if item.tid < tid:
assert None is not item.next_tid <= tid
break
if item.tid == tid:
# We don't handle late invalidations for cached oids,
# because the caller is not supposed to explicitly asks
# for tids after app.last_tid (and the cache should be
# empty when app.last_tid is still None).
assert item.next_tid == next_tid and item.data == data
return
i += 1
if next_tid:
counter = 1
else:
if next_tid:
insort(item_list, item)
else:
prev = item_list[-1]
assert prev.next_tid <= tid, (prev, item)
item.counter = prev.counter
if prev.level:
prev.counter = 0
if prev.level > 1:
self._fetched(prev)
item_list.append(item)
else:
self._remove(prev)
item_list[-1] = item
counter = item.counter
if counter != 1:
del self._items[item.expire]
item.counter = 1
self._add(item)
item = CacheItem()
item.oid = oid
item.tid = tid
item.next_tid = next_tid
item.data = data
self._fetched(item)
item.counter = counter
items.insert(i, item)
self._size += size
if max_size < self._size:
for head in self._queue_list[1:]:
while head:
next = self._remove(head)
if head.counter:
head.level = 0
self._add(head)
else:
self._empty(head)
self._remove_from_oid_dict(head)
if self._size <= max_size:
return
head = next
self._add(item)
while max_size < self._size:
items = self._items
self._forget(items[items.minKey()])
def invalidate(self, oid, tid):
"""Mark data record as being valid only up to given tid"""
try:
item = self._oid_dict[oid][-1]
except KeyError:
pass
else:
items = self._oid_dict.get(oid)
if items:
item = items[0]
if item.next_tid is None:
item.next_tid = tid
else:
assert item.next_tid <= tid, (item, oid, tid)
def clear_current(self):
for oid, item_list in self._oid_dict.items():
item = item_list[-1]
for oid, items in self._oid_dict.items():
item = items[0]
if item.next_tid is None:
if item.level:
self._empty(item)
self._remove(item)
del item_list[-1]
# We don't preserve statistics of removed items. This could be
# done easily when previous versions are cached, by copying
# counters, but it would not be fair for other oids, so it's
# probably not worth it.
if not item_list:
del self._oid_dict[oid]
self._forget(item)
def test(self):
orig_add = ClientCache._add
def _add(cache, item):
orig_add(cache, item)
self.assertLessEqual(0, cache._items.minKey())
ClientCache._add = _add
cache = ClientCache()
repr(cache)
self.assertEqual(cache.load(1, 10), None)
......@@ -324,24 +269,26 @@ def test(self):
self.assertEqual(cache.load(1, 20), ('15', 15, 20))
cache.store(1, '10', 10, 15)
cache.store(1, '20', 20, 21)
self.assertEqual([5, 10, 15, 20], [x.tid for x in cache._oid_dict[1]])
self.assertEqual([20, 15, 10, 5], [x.tid for x in cache._oid_dict[1]])
self.assertRaises(AssertionError, cache.store, 1, '20', 20, None)
repr(cache)
map(repr, cache._queue_list)
# Test late invalidations.
cache.clear()
cache.store(1, '10*', 10, None)
cache.max_size = cache._size
cache.store(2, '10', 10, 15)
self.assertEqual(cache._queue_list[0].oid, 1)
cache.store(2, '15', 15, None)
self.assertEqual(cache._queue_list[2].oid, 2)
data = '10', 10, 15
cache.store(1, *data)
self.assertEqual(cache.load(1, 15), data)
self.assertEqual(1, cache._history_size)
cache = ClientCache(10)
data1 = "x", 1, None
cache.store(1, "x", 1, None)
repr(*cache._oid_dict[1])
data = "xxxxx", 1, None
cache.store(2, *data)
cache.store(3, *data)
self.assertEqual(cache.load(1, None), data1)
self.assertEqual(cache.load(2, None), None) # bigger records removed faster
self.assertEqual(cache.load(3, None), data)
self.assertEqual(cache._size, 6)
cache.clear_current()
self.assertEqual(0, cache._history_size)
for oid in 0, 1:
cache.store(oid, 'x', 1, None)
cache.load(oid, None)
cache.load(oid, None)
cache.load(0, None)
if __name__ == '__main__':
import unittest
......
......@@ -931,7 +931,7 @@ class Test(NEOThreadedTest):
ll()
x2._p_deactivate()
# Remove last version of x from cache
cache._remove(cache._oid_dict[x2._p_oid].pop())
cache._forget(cache._oid_dict[x2._p_oid][0])
with ll, Patch(cluster.client, _loadFromStorage=break_after):
t = self.newThread(x2._p_activate)
ll()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment