Commit edde1fe3 authored by Julien Muchembled's avatar Julien Muchembled

Rewrite storage cache of client

- Stop using a list of (in)validated tid (hence removal of RevisionIndex),
  because it can't work in all cases and would even cause memory leaks.
  For example, this bug could lead to ConflictError with a single client.
  Fixit it also requires that database backends always return the next serial.
- Several performance improvements. The most important one is when the latest
  version of an object is cached: it inherits the access counter of the
  previous one (for the same oid), which gets in turn its counter reset.
- Do not waste CPU evaluating the real size taken by an entry in memory.
  Just use 'len' on the value (which is always a pickle data, i.e. a string).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2711 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 240c94b3
...@@ -42,12 +42,11 @@ from neo.client.handlers import storage, master ...@@ -42,12 +42,11 @@ from neo.client.handlers import storage, master
from neo.lib.dispatcher import Dispatcher, ForgottenPacket from neo.lib.dispatcher import Dispatcher, ForgottenPacket
from neo.client.poll import ThreadedPoll, psThreadedPoll from neo.client.poll import ThreadedPoll, psThreadedPoll
from neo.client.iterator import Iterator from neo.client.iterator import Iterator
from neo.client.mq import MQ from neo.client.cache import ClientCache
from neo.client.pool import ConnectionPool from neo.client.pool import ConnectionPool
from neo.lib.util import u64, parseMasterList from neo.lib.util import u64, parseMasterList
from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED from neo.lib.profiling import profiler_decorator, PROFILING_ENABLED
from neo.lib.debug import register as registerLiveDebugger from neo.lib.debug import register as registerLiveDebugger
from neo.client.mq_index import RevisionIndex
from neo.client.container import ThreadContainer, TransactionContainer from neo.client.container import ThreadContainer, TransactionContainer
if PROFILING_ENABLED: if PROFILING_ENABLED:
...@@ -93,9 +92,7 @@ class Application(object): ...@@ -93,9 +92,7 @@ class Application(object):
# no self-assigned UUID, primary master will supply us one # no self-assigned UUID, primary master will supply us one
self.uuid = None self.uuid = None
self.mq_cache = MQ() self._cache = ClientCache()
self.cache_revision_index = RevisionIndex()
self.mq_cache.addIndex(self.cache_revision_index)
self.new_oid_list = [] self.new_oid_list = []
self.last_oid = '\0' * 8 self.last_oid = '\0' * 8
self.storage_event_handler = storage.StorageEventHandler(self) self.storage_event_handler = storage.StorageEventHandler(self)
...@@ -432,20 +429,17 @@ class Application(object): ...@@ -432,20 +429,17 @@ class Application(object):
self._load_lock_acquire() self._load_lock_acquire()
try: try:
try: result = self._loadFromCache(oid, serial, tid)
return self._loadFromCache(oid, serial, tid) if not result:
except KeyError: result = self._loadFromStorage(oid, serial, tid)
pass self._cache_lock_acquire()
data, start_serial, end_serial = self._loadFromStorage(oid, serial, try:
tid) self._cache.store(oid, *result)
self._cache_lock_acquire() finally:
try: self._cache_lock_release()
self.mq_cache[(oid, start_serial)] = data, end_serial if result[0] == '':
finally: raise NEOStorageCreationUndoneError(dump(oid))
self._cache_lock_release() return result
if data == '':
raise NEOStorageCreationUndoneError(dump(oid))
return data, start_serial, end_serial
finally: finally:
self._load_lock_release() self._load_lock_release()
...@@ -479,24 +473,17 @@ class Application(object): ...@@ -479,24 +473,17 @@ class Application(object):
return data, tid, next_tid return data, tid, next_tid
@profiler_decorator @profiler_decorator
def _loadFromCache(self, oid, at_tid, before_tid): def _loadFromCache(self, oid, at_tid=None, before_tid=None):
""" """
Load from local cache, raising KeyError if not found. Load from local cache, return None if not found.
""" """
self._cache_lock_acquire() self._cache_lock_acquire()
try: try:
if at_tid is not None: if at_tid:
tid = at_tid result = self._cache.load(oid, at_tid + '*')
elif before_tid is not None: assert not result or result[1] == at_tid
tid = self.cache_revision_index.getSerialBefore(oid, return result
before_tid) return self._cache.load(oid, before_tid)
else:
tid = self.cache_revision_index.getLatestSerial(oid)
if tid is None:
raise KeyError
# Raises KeyError on miss
data, next_tid = self.mq_cache[(oid, tid)]
return (data, tid, next_tid)
finally: finally:
self._cache_lock_release() self._cache_lock_release()
...@@ -808,14 +795,7 @@ class Application(object): ...@@ -808,14 +795,7 @@ class Application(object):
# Update cache # Update cache
self._cache_lock_acquire() self._cache_lock_acquire()
try: try:
mq_cache = self.mq_cache cache = self._cache
update = mq_cache.update
def updateNextSerial(value):
data, next_tid = value
assert next_tid is None, (dump(oid), dump(base_tid),
dump(next_tid))
return (data, tid)
get_baseTID = txn_context['object_base_serial_dict'].get
for oid, data in txn_context['data_dict'].iteritems(): for oid, data in txn_context['data_dict'].iteritems():
if data is None: if data is None:
# this is just a remain of # this is just a remain of
...@@ -823,16 +803,10 @@ class Application(object): ...@@ -823,16 +803,10 @@ class Application(object):
# was modified). # was modified).
continue continue
# Update ex-latest value in cache # Update ex-latest value in cache
base_tid = get_baseTID(oid) cache.invalidate(oid, tid)
try: if data:
update((oid, base_tid), updateNextSerial)
except KeyError:
pass
if data == '':
self.cache_revision_index.invalidate([oid], tid)
else:
# Store in cache with no next_tid # Store in cache with no next_tid
mq_cache[(oid, tid)] = (data, None) cache.store(oid, data, tid, None)
finally: finally:
self._cache_lock_release() self._cache_lock_release()
txn_container.delete(transaction) txn_container.delete(transaction)
...@@ -1105,7 +1079,7 @@ class Application(object): ...@@ -1105,7 +1079,7 @@ class Application(object):
# by a pack), so don't bother invalidating on other clients. # by a pack), so don't bother invalidating on other clients.
self._cache_lock_acquire() self._cache_lock_acquire()
try: try:
self.mq_cache.clear() self._cache.clear()
finally: finally:
self._cache_lock_release() self._cache_lock_release()
......
############################################################################## ##############################################################################
# #
# Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved. # Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
# Yoshinori Okuji <yo@nexedi.com> # Julien Muchembled <jm@nexedi.com>
# #
# WARNING: This program as such is intended to be used by professional # WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential # programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs # consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial # End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software # guarantees and support are strongly advised to contract a Free Software
# Service Company # Service Company
# #
# This program is Free Software; you can redistribute it and/or # This program is Free Software; you can redistribute it and/or
...@@ -17,140 +17,41 @@ ...@@ -17,140 +17,41 @@
# #
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of # but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details. # GNU General Public License for more details.
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# #
############################################################################## ##############################################################################
""" import math
Multi-Queue Cache Algorithm.
""" class CacheItem(object):
from math import log __slots__ = ('oid', 'tid', 'next_tid', 'data',
'counter', 'level', 'expire',
class MQIndex(object): 'prev', 'next')
"""
Virtual base class for MQ cache external indexes. def __repr__(self):
""" s = ''
def clear(self): for attr in self.__slots__:
""" try:
Called when MQ is cleared. value = getattr(self, attr)
""" if value:
raise NotImplementedError if attr in ('prev', 'next'):
s += ' %s=<...>' % attr
def remove(self, key): continue
""" elif attr == 'data':
Called when key's value is removed from cache, and key is pushed to value = '...'
history buffer. s += ' %s=%r' % (attr, value)
""" except AttributeError:
raise NotImplementedError pass
return '<%s%s>' % (self.__class__.__name__, s)
def add(self, key):
""" class ClientCache(object):
Called when key is added into cache. """In-memory pickle cache based on Multi-Queue cache algorithm
It is either a new key, or a know key comming back from history
buffer.
"""
raise NotImplementedError
class Element(object):
"""
This class defines an element of a FIFO buffer.
"""
pass
class FIFO(object):
"""
This class implements a FIFO buffer.
"""
def __init__(self):
self.head = None
self.tail = None
self._len = 0
self.prev = None
self.data = None
self.next = None
self.level = None
self.counter = None
self.value = None
self.element = None
self.key = None
self.expire_time = None
def __len__(self):
return self._len
def append(self):
element = Element()
element.next = None
element.prev = self.tail
if self.tail is not None:
self.tail.next = element
self.tail = element
if self.head is None:
self.head = element
self._len += 1
return element
def shift(self):
element = self.head
if element is None:
return None
del self[element]
del element.next
del element.prev
return element
def __delitem__(self, element):
if element.next is None:
self.tail = element.prev
else:
element.next.prev = element.prev
if element.prev is None:
self.head = element.next
else:
element.prev.next = element.next
self._len -= 1
class Data(object):
"""
Data for each element in a FIFO buffer.
"""
pass
def sizeof(o):
"""This function returns the estimated size of an object."""
if isinstance(o, tuple):
return sum((sizeof(s) for s in o))
elif o is None:
# XXX: unknown size (arch pointer ?)
return 0
else:
return len(o)+16
class MQ(object):
"""
This class manages cached data by a variant of Multi-Queue.
This class caches various sizes of objects. Here are some considerations:
- Expired objects are not really deleted immediately. But if GC is invoked too often,
it degrades the performance significantly.
- If large objects are cached, the number of cached objects decreases. This might affect
the cache hit ratio. It might be better to tweak a buffer level according to the size of
an object.
- Stored values must be strings.
- The size calculation is not accurate.
Quick description of Multi-Queue algorithm: Quick description of Multi-Queue algorithm:
- There are multiple "regular" queues, plus a history queue - There are multiple "regular" queues, plus a history queue
...@@ -159,224 +60,179 @@ class MQ(object): ...@@ -159,224 +60,179 @@ class MQ(object):
longer lifespan) longer lifespan)
-> The more often an object is accessed, the higher lifespan it will -> The more often an object is accessed, the higher lifespan it will
have have
- Upon a cache miss, the oldest entry of first non-empty queue is
transfered to history queue
- Upon cache or history hit, object frequency is increased and object - Upon cache or history hit, object frequency is increased and object
might get moved to longer-lived queue might get moved to longer-lived queue
- Each access "ages" objects in cache, and an aging object is moved to - Each access "ages" objects in cache, and an aging object is moved to
shorter-lived queue as it ages without being accessed shorter-lived queue as it ages without being accessed, or in the
history queue if it's really too old.
""" """
def __init__(self, life_time=10000, buffer_levels=9, __slots__ = ('_life_time', '_max_history_size', '_max_size',
max_history_size=100000, max_size=20*1024*1024): '_queue_list', '_oid_dict', '_time', '_size', '_history_size')
self._index_list = []
def __init__(self, life_time=10000, max_history_size=100000,
max_size=20*1024*1024):
self._life_time = life_time self._life_time = life_time
self._buffer_levels = buffer_levels
self._max_history_size = max_history_size self._max_history_size = max_history_size
self._max_size = max_size self._max_size = max_size
self.clear() self.clear()
def addIndex(self, index, reindex=True):
"""
Add an index ot this cache.
index
Object implementing methods from MQIndex class.
reindex (True)
If True, give all existing keys as new to index.
"""
if reindex:
# Index existing entries
add = index.add
for key in self._data:
add(key)
self._index_list.append(index)
def _mapIndexes(self, method_id, args=(), kw={}):
for index in self._index_list:
getattr(index, method_id)(*args, **kw)
def clear(self): def clear(self):
self._history_buffer = FIFO() """Reset cache"""
self._cache_buffers = [] self._queue_list = [None] # first is history
for level in range(self._buffer_levels): self._oid_dict = {}
self._cache_buffers.append(FIFO())
self._data = {}
self._time = 0 self._time = 0
self._size = 0 self._size = 0
self._mapIndexes('clear') self._history_size = 0
def has_key(self, key): def _iterQueue(self, level):
if key in self._data: """for debugging purpose"""
data = self._data[key] if level < len(self._queue_list):
if data.level >= 0: item = head = self._queue_list[level]
return 1 if item:
return 0 while 1:
yield item
__contains__ = has_key item = item.next
if item is head:
def fetch(self, key): break
"""
Fetch a value associated with the key.
"""
data = self._data[key]
if data.level >= 0:
value = data.value
self._size -= sizeof(value)
self.store(key, value)
return value
raise KeyError(key)
__getitem__ = fetch
def get(self, key, d=None):
try:
return self.fetch(key)
except KeyError:
return d
def _evict(self, key):
"""
Evict an element to the history buffer.
"""
self._mapIndexes('remove', (key, ))
data = self._data[key]
self._size -= sizeof(data.value)
del self._cache_buffers[data.level][data.element]
element = self._history_buffer.append()
data.level = -1
data.element = element
del data.value
del data.expire_time
element.data = data
if len(self._history_buffer) > self._max_history_size:
element = self._history_buffer.shift()
del self._data[element.data.key]
def store(self, key, value):
cache_buffers = self._cache_buffers
def _add(self, item):
level = item.level
try: try:
data = self._data[key] head = self._queue_list[level]
except KeyError: except IndexError:
counter = 1 assert len(self._queue_list) == level
self._mapIndexes('add', (key, )) self._queue_list.append(item)
item.prev = item.next = item
else: else:
level, element, counter = data.level, data.element, data.counter + 1 if head:
if level >= 0: item.prev = tail = head.prev
del cache_buffers[level][element] tail.next = head.prev = item
item.next = head
else: else:
del self._history_buffer[element] self._queue_list[level] = item
self._mapIndexes('add', (key, )) item.prev = item.next = item
if level:
item.expire = self._time + self._life_time
else:
self._size -= len(item.data)
item.data = None
if self._history_size < self._max_history_size:
self._history_size += 1
else:
self._remove(head)
item_list = self._oid_dict[head.oid]
item_list.remove(head)
if not item_list:
del self._oid_dict[head.oid]
def _remove(self, item):
level = item.level
if level is not None:
item.level = level - 1
next = item.next
if next is item:
self._queue_list[level] = next = None
else:
item.prev.next = next
next.prev = item.prev
if self._queue_list[level] is item:
self._queue_list[level] = next
return next
def _fetched(self, item, _log=math.log):
self._remove(item)
item.counter = counter = item.counter + 1
# XXX It might be better to adjust the level according to the object # XXX It might be better to adjust the level according to the object
# size. # size.
level = min(int(log(counter, 2)), self._buffer_levels - 1) item.level = 1 + int(_log(counter, 2))
element = cache_buffers[level].append() self._add(item)
data = Data()
data.key = key self._time = time = self._time + 1
data.expire_time = self._time + self._life_time for head in self._queue_list[1:]:
data.level = level if head and head.expire < time:
data.element = element self._remove(head)
data.value = value self._add(head)
data.counter = counter break
element.data = data
self._data[key] = data def _load(self, oid, before_tid=None):
self._size += sizeof(value) item_list = self._oid_dict.get(oid)
del value if item_list:
if before_tid:
self._time += 1 for item in reversed(item_list):
if item.tid < before_tid:
# Expire old elements. next_tid = item.next_tid
time = self._time if next_tid and next_tid < before_tid:
for level, cache_buffer in enumerate(cache_buffers): break
head = cache_buffer.head return item
if head is not None and head.data.expire_time < time: else:
del cache_buffer[head] item = item_list[-1]
data = head.data if not item.next_tid:
if level > 0: return item
new_level = level - 1
element = cache_buffers[new_level].append() def load(self, oid, before_tid=None):
element.data = data """Return a revision of oid that was current before given tid"""
data.expire_time = time + self._life_time item = self._load(oid, before_tid)
data.level = new_level if item:
data.element = element data = item.data
else: if data is not None:
self._evict(data.key) self._fetched(item)
return data, item.tid, item.next_tid
# Limit the size.
def store(self, oid, data, tid, next_tid):
"""Store a new data record in the cache"""
size = len(data)
max_size = self._max_size max_size = self._max_size
if self._size > max_size: if size < max_size:
for cache_buffer in cache_buffers: item = self._load(oid, next_tid)
while self._size > max_size: if item:
element = cache_buffer.head assert not (item.data or item.level)
if element is None: assert item.tid == tid and item.next_tid == next_tid
break self._history_size -= 1
self._evict(element.data.key) else:
if self._size <= max_size: item = CacheItem()
break item.oid = oid
item.tid = tid
__setitem__ = store item.next_tid = next_tid
item.counter = 0
def invalidate(self, key): item.level = None
if key in self._data: try:
data = self._data[key] item_list = self._oid_dict[oid]
if data.level >= 0: except KeyError:
del self._cache_buffers[data.level][data.element] self._oid_dict[oid] = [item]
self._evict(key) else:
return if next_tid:
raise KeyError, "%s was not found in the cache" % (key, ) for i, x in enumerate(item_list):
if tid < x.tid:
__delitem__ = invalidate break
item_list.insert(i, item)
def update(self, key, callback): else:
""" if item_list:
Update entry without changing its level. prev = item_list[-1]
""" item.counter = prev.counter
data = self._data[key] prev.counter = 0
if data.level < 0: if prev.level > 1:
raise KeyError(key) self._fetched(prev)
data.value = callback(data.value) item_list.append(item)
item.data = data
# Here is a test. self._fetched(item)
if __name__ == '__main__': self._size += size
import hotshot, hotshot.stats if max_size < self._size:
for head in self._queue_list[1:]:
def test(): while head:
cache = MQ(life_time=100, buffer_levels=9, max_history_size=10000, next = self._remove(head)
max_size=2*1024*1024) head.level = 0
self._add(head)
cache[0] = 'foo' if self._size <= max_size:
assert cache[0] == 'foo', '0 should be present' return
del cache[0] head = next
assert cache.get(0) is None, '0 should not be present'
def invalidate(self, oid, tid):
for i in xrange(10000): """Mark data record as being valid only up to given tid"""
assert cache.get(i) is None, '%d should not be present' % i try:
item = self._oid_dict[oid][-1]
for i in xrange(10000): except KeyError:
cache[i] = str(i) pass
assert cache.get(i) == str(i), '%d does not exist' % i else:
if item.next_tid is None:
for i in xrange(10000 - 100 - 1): item.next_tid = tid
assert cache.get(i) is None, '%d should not be present' % i
for i in xrange(10):
cache[i] = str(i)
for _ in xrange(1000):
for i in xrange(10):
assert cache.get(i) == str(i), '%d does not exist' % i
for i in xrange(10, 500):
cache[i] = str(i)
for i in xrange(10):
assert cache.get(i) == str(i), '%d does not exist' % i
prof = hotshot.Profile("mq.prof")
prof.runcall(test)
prof.close()
stats = hotshot.stats.load("mq.prof")
stats.strip_dirs()
stats.sort_stats('time', 'calls')
stats.print_stats(20)
...@@ -121,11 +121,12 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -121,11 +121,12 @@ class PrimaryNotificationsHandler(BaseHandler):
app = self.app app = self.app
app._cache_lock_acquire() app._cache_lock_acquire()
try: try:
# ZODB required a dict with oid as key, so create it invalidate = app._cache.invalidate
app.cache_revision_index.invalidate(oid_list, tid) for oid in oid_list:
invalidate(oid, tid)
db = app.getDB() db = app.getDB()
if db is not None: if db is not None:
db.invalidate(tid, dict.fromkeys(oid_list, tid)) db.invalidate(tid, oid_list)
finally: finally:
app._cache_lock_release() app._cache_lock_release()
......
#
# Copyright (C) 2010-2011 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.client.mq import MQIndex
class RevisionIndex(MQIndex):
"""
This cache index allows accessing a specific revision of a cached object.
It requires cache key to be a 2-tuple, composed of oid and revision.
Note: it is expected that rather few revisions are held in cache, with few
lookups for old revisions, so they are held in a simple sorted list
Note2: all methods here must be called with cache lock acquired.
"""
def __init__(self):
# key: oid
# value: tid list, from highest to lowest
self._oid_dict = {}
# key: oid
# value: tid list, from lowest to highest
self._invalidated = {}
def clear(self):
self._oid_dict.clear()
self._invalidated.clear()
def remove(self, key):
oid_dict = self._oid_dict
oid, tid = key
tid_list = oid_dict[oid]
tid_list.remove(tid)
if not tid_list:
# No more serial known for this object, drop entirely
del oid_dict[oid]
self._invalidated.pop(oid, None)
def add(self, key):
oid_dict = self._oid_dict
oid, tid = key
try:
serial_list = oid_dict[oid]
except KeyError:
serial_list = oid_dict[oid] = []
else:
assert tid not in serial_list
if not(serial_list) or tid > serial_list[0]:
serial_list.insert(0, tid)
else:
serial_list.insert(0, tid)
serial_list.sort(reverse=True)
invalidated = self._invalidated
try:
tid_list = invalidated[oid]
except KeyError:
pass
else:
try:
tid_list.remove(tid)
except ValueError:
pass
else:
if not tid_list:
del invalidated[oid]
def invalidate(self, oid_list, tid):
"""
Mark object invalidated by given transaction.
Must be called with increasing TID values (which is standard for
ZODB).
"""
invalidated = self._invalidated
oid_dict = self._oid_dict
for oid in (x for x in oid_list if x in oid_dict):
try:
tid_list = invalidated[oid]
except KeyError:
tid_list = invalidated[oid] = []
assert not tid_list or tid > tid_list[-1], (dump(oid), dump(tid),
dump(tid_list[-1]))
tid_list.append(tid)
def getSerialBefore(self, oid, tid):
"""
Get the first tid in cache which value is lower that given tid.
"""
# WARNING: return-intensive to save on indentation
oid_list = self._oid_dict.get(oid)
if oid_list is None:
# Unknown oid
return None
for result in oid_list:
if result < tid:
# Candidate found
break
else:
# No candidate in cache.
return None
# Check if there is a chance that an intermediate revision would
# exist, while missing from cache.
try:
inv_tid_list = self._invalidated[oid]
except KeyError:
return result
# Remember: inv_tid_list is sorted in ascending order.
for inv_tid in inv_tid_list:
if tid < inv_tid:
# We don't care about invalidations past requested TID.
break
elif result < inv_tid < tid:
# An invalidation was received between candidate revision,
# and before requested TID: there is a matching revision we
# don't know of, so we cannot answer.
return None
return result
def getLatestSerial(self, oid):
"""
Get the latest tid for given object.
"""
result = self._oid_dict.get(oid)
if result is not None:
result = result[0]
try:
tid_list = self._invalidated[oid]
except KeyError:
pass
else:
if result < tid_list[-1]:
# An invalidation happened from a transaction later than
# our most recent view of this object, so we cannot answer.
result = None
return result
def getSerialList(self, oid):
"""
Get the list of all serials cache knows about for given object.
"""
return self._oid_dict.get(oid, [])[:]
...@@ -275,37 +275,22 @@ class BTreeDatabaseManager(DatabaseManager): ...@@ -275,37 +275,22 @@ class BTreeDatabaseManager(DatabaseManager):
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
tserial = self._obj.get(oid) tserial = self._obj.get(oid)
if tserial is None: if tserial is not None:
result = None
else:
if tid is None: if tid is None:
if before_tid is None: try:
try: if before_tid is None:
tid = tserial.maxKey() tid = tserial.maxKey()
except ValueError: else:
tid = None tid = tserial.maxKey(before_tid - 1)
else: except ValueError:
before_tid -= 1 return
try: result = tserial.get(tid)
tid = tserial.maxKey(before_tid)
except ValueError:
tid = None
if tid is None:
result = None
else:
result = tserial.get(tid, None)
if result: if result:
compression, checksum, data, value_serial = result try:
if before_tid is None: next_serial = tserial.minKey(tid + 1)
except ValueError:
next_serial = None next_serial = None
else: return (tid, next_serial) + result
try:
next_serial = tserial.minKey(tid + 1)
except ValueError:
next_serial = None
result = (tid, next_serial, compression, checksum, data,
value_serial)
return result
def doSetPartitionTable(self, ptid, cell_list, reset): def doSetPartitionTable(self, ptid, cell_list, reset):
pt = self._pt pt = self._pt
......
...@@ -255,6 +255,8 @@ class DatabaseManager(object): ...@@ -255,6 +255,8 @@ class DatabaseManager(object):
else: else:
serial, next_serial, compression, checksum, data, data_serial = \ serial, next_serial, compression, checksum, data, data_serial = \
result result
assert before_tid is None or next_serial is None or \
before_tid <= next_serial
if data is None and resolve_data: if data is None and resolve_data:
try: try:
_, compression, checksum, data = self._getObjectData(oid, _, compression, checksum, data = self._getObjectData(oid,
......
...@@ -335,50 +335,30 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -335,50 +335,30 @@ class MySQLDatabaseManager(DatabaseManager):
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
q = self.query q = self.query
partition = self._getPartition(oid) partition = self._getPartition(oid)
sql = """SELECT serial, compression, checksum, value, value_serial
FROM obj
WHERE partition = %d
AND oid = %d""" % (partition, oid)
if tid is not None: if tid is not None:
r = q("""SELECT serial, compression, checksum, value, value_serial sql += ' AND serial = %d' % tid
FROM obj
WHERE partition = %d AND oid = %d AND serial = %d""" \
% (partition, oid, tid))
try:
serial, compression, checksum, data, value_serial = r[0]
next_serial = None
except IndexError:
return None
elif before_tid is not None: elif before_tid is not None:
r = q("""SELECT serial, compression, checksum, value, value_serial sql += ' AND serial < %d ORDER BY serial DESC LIMIT 1' % before_tid
FROM obj
WHERE partition = %d
AND oid = %d AND serial < %d
ORDER BY serial DESC LIMIT 1""" \
% (partition, oid, before_tid))
try:
serial, compression, checksum, data, value_serial = r[0]
except IndexError:
return None
r = q("""SELECT serial FROM obj_short
WHERE partition = %d
AND oid = %d AND serial >= %d
ORDER BY serial LIMIT 1""" \
% (partition, oid, before_tid))
try:
next_serial = r[0][0]
except IndexError:
next_serial = None
else: else:
# XXX I want to express "HAVING serial = MAX(serial)", but # XXX I want to express "HAVING serial = MAX(serial)", but
# MySQL does not use an index for a HAVING clause! # MySQL does not use an index for a HAVING clause!
r = q("""SELECT serial, compression, checksum, value, value_serial sql += ' ORDER BY serial DESC LIMIT 1'
FROM obj r = q(sql)
WHERE partition = %d AND oid = %d try:
ORDER BY serial DESC LIMIT 1""" \ serial, compression, checksum, data, value_serial = r[0]
% (partition, oid)) except IndexError:
try: return None
serial, compression, checksum, data, value_serial = r[0] r = q("""SELECT serial FROM obj_short
next_serial = None WHERE partition = %d AND oid = %d AND serial > %d
except IndexError: ORDER BY serial LIMIT 1""" % (partition, oid, serial))
return None try:
next_serial = r[0][0]
except IndexError:
next_serial = None
return serial, next_serial, compression, checksum, data, value_serial return serial, next_serial, compression, checksum, data, value_serial
def doSetPartitionTable(self, ptid, cell_list, reset): def doSetPartitionTable(self, ptid, cell_list, reset):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment