Commit 19b43534 authored by bescoto's avatar bescoto

Added CachedIndexableProcessor to rorpiter


git-svn-id: http://svn.savannah.nongnu.org/svn/rdiff-backup@279 2b77aa54-bcbc-44c9-a7ec-4f6cf2b41109
parent 791d4425
...@@ -440,5 +440,53 @@ class CacheIndexable: ...@@ -440,5 +440,53 @@ class CacheIndexable:
def get(self, index): def get(self, index):
"""Return element with index index from cache""" """Return element with index index from cache"""
try: return self.cache_dict[index] try: return self.cache_dict[index]
except KeyError: return None except KeyError:
assert index > self.cache_indicies[0], index
return None
class CachedIndexableProcessor:
"""Reorder indicies, then feed into some function in order
Use this class when you want to run some function on a stream of
objects in index order. However, the objects may be slightly out
of index order. This class will cache a certain number, and then
reorder them.
An error is signaled if the indicies arrive too out of order.
"""
def __init__(self, function, cache_size):
"""CIP initializer. function is called on every elem."""
self.function = function
self.cache_size = cache_size
self.cache_indicies = []
self.cache_dict = {}
def process(self, elem):
"""Call CIP (and underlying self.function) on indexed elem"""
index = elem.index
self.cache_dict[index] = elem
if self.cache_indicies and index <= self.cache_indicies[-1]:
assert index > self.cache_indicies[0]
self.cache_indicies.append(index)
self.cache_indicies.sort() # Ack, n log n, should be log n!!!
else: self.cache_indicies.append(index)
if len(self.cache_indicies) > self.cache_size:
first_index = self.cache_indicies[0]
first_elem = self.cache_dict[first_index]
del self.cache_indicies[0]
del self.cache_dict[first_index]
self.function(first_elem)
__call__ = process
def close(self):
"""Flush cache by running function on remaining elems"""
while self.cache_indicies:
index = self.cache_indicies[0]
elem = self.cache_dict[index]
del self.cache_indicies[0]
del self.cache_dict[index]
self.function(elem)
...@@ -280,7 +280,8 @@ class CacheIndexableTest(unittest.TestCase): ...@@ -280,7 +280,8 @@ class CacheIndexableTest(unittest.TestCase):
assert ci.get((3,)) == self.d[(3,)] assert ci.get((3,)) == self.d[(3,)]
assert ci.get((4,)) == self.d[(4,)] assert ci.get((4,)) == self.d[(4,)]
assert ci.get((1,)) is None assert ci.get((3,5)) is None
self.assertRaises(AssertionError, ci.get, (1,))
def testEqual(self): def testEqual(self):
"""Make sure CI doesn't alter properties of underlying iter""" """Make sure CI doesn't alter properties of underlying iter"""
...@@ -288,7 +289,30 @@ class CacheIndexableTest(unittest.TestCase): ...@@ -288,7 +289,30 @@ class CacheIndexableTest(unittest.TestCase):
l1 = list(self.get_iter()) l1 = list(self.get_iter())
l2 = list(rorpiter.CacheIndexable(iter(l1), 10)) l2 = list(rorpiter.CacheIndexable(iter(l1), 10))
assert l1 == l2, (l1, l2) assert l1 == l2, (l1, l2)
class CacheIndexableProcessorTest(unittest.TestCase):
def function(self, elem):
"""Called by CIP on each elem"""
self.l.append(elem)
def testReorder(self):
"""Test the reordering abilities of CIP"""
CIP = rorpiter.CachedIndexableProcessor(self.function, 3)
in_list = [rorpiter.IndexedTuple((x,), (x,)) for x in range(6)]
self.l = []
CIP(in_list[0])
CIP(in_list[2])
CIP(in_list[1])
CIP(in_list[5])
CIP(in_list[3])
CIP(in_list[4])
self.assertRaises(AssertionError, CIP, in_list[0])
CIP.close()
assert self.l == in_list, (self.l, in_list)
if __name__ == "__main__": unittest.main() if __name__ == "__main__": unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment