Commit 52386318 authored by Andreas Jung's avatar Andreas Jung

integrated ExtendedPathIndex functionality

parent 75b79d6e
...@@ -55,9 +55,9 @@ class PathIndex(Persistent, SimpleItem): ...@@ -55,9 +55,9 @@ class PathIndex(Persistent, SimpleItem):
'help': ('PathIndex','PathIndex_Settings.stx')}, 'help': ('PathIndex','PathIndex_Settings.stx')},
) )
query_options = ("query", "level", "operator") query_options = ("query", "level", "operator", "depth", "navtree")
def __init__(self,id,caller=None): def ___init__(self,id,caller=None):
self.id = id self.id = id
self.operators = ('or','and') self.operators = ('or','and')
self.useOperator = 'or' self.useOperator = 'or'
...@@ -87,10 +87,77 @@ class PathIndex(Persistent, SimpleItem): ...@@ -87,10 +87,77 @@ class PathIndex(Persistent, SimpleItem):
if level > self._depth: if level > self._depth:
self._depth = level self._depth = level
def numObjects(self):
""" return the number distinct values """
return len(self._unindex)
def indexSize(self):
""" return the number of indexed objects"""
return len(self)
def __len__(self):
return self._length()
def hasUniqueValuesFor(self, name):
"""has unique values for column name"""
return name == self.id
def uniqueValues(self, name=None, withLength=0):
""" needed to be consistent with the interface """
return self._index.keys()
def getEntryForObject(self, docid, default=_marker):
""" Takes a document ID and returns all the information
we have on that specific object.
"""
try:
return self._unindex[docid]
except KeyError:
# XXX Why is default ignored?
return None
def __init__(self, id, extra=None, caller=None):
""" ExtendedPathIndex supports indexed_attrs """
self.___init__( id, caller)
def get(o, k, default):
if isinstance(o, dict):
return o.get(k, default)
else:
return getattr(o, k, default)
attrs = get(extra, 'indexed_attrs', None)
if attrs is None:
return
if isinstance(attrs, str):
attrs = attrs.split(',')
attrs = filter(None, [a.strip() for a in attrs])
if attrs:
# We only index the first attribute so snip off the rest
self.indexed_attrs = tuple(attrs[:1])
def index_object(self, docid, obj ,threshold=100): def index_object(self, docid, obj ,threshold=100):
""" hook for (Z)Catalog """ """ hook for (Z)Catalog """
f = getattr(obj, self.id, None) # PathIndex first checks for an attribute matching its id and
# falls back to getPhysicalPath only when failing to get one.
# The presence of 'indexed_attrs' overrides this behavior and
# causes indexing of the custom attribute.
attrs = getattr(self, 'indexed_attrs', None)
if attrs:
index = attrs[0]
else:
index = self.id
f = getattr(obj, index, None)
if f is not None: if f is not None:
if safe_callable(f): if safe_callable(f):
try: try:
...@@ -100,7 +167,7 @@ class PathIndex(Persistent, SimpleItem): ...@@ -100,7 +167,7 @@ class PathIndex(Persistent, SimpleItem):
else: else:
path = f path = f
if not isinstance(path, (StringType, TupleType)): if not isinstance(path, (str, tuple)):
raise TypeError('path value must be string or tuple of strings') raise TypeError('path value must be string or tuple of strings')
else: else:
try: try:
...@@ -108,15 +175,25 @@ class PathIndex(Persistent, SimpleItem): ...@@ -108,15 +175,25 @@ class PathIndex(Persistent, SimpleItem):
except AttributeError: except AttributeError:
return 0 return 0
if isinstance(path, (ListType, TupleType)): if isinstance(path, (list, tuple)):
path = '/'+ '/'.join(path[1:]) path = '/'+ '/'.join(path[1:])
comps = filter(None, path.split('/')) comps = filter(None, path.split('/'))
# Make sure we reindex properly when path change
if self._unindex.has_key(docid) and self._unindex.get(docid) != path:
self.unindex_object(docid)
if not self._unindex.has_key(docid): if not self._unindex.has_key(docid):
if hasattr(self, '_migrate_length'):
self._migrate_length()
self._length.change(1) self._length.change(1)
for i in range(len(comps)): for i in range(len(comps)):
self.insertEntry(comps[i], docid, i) self.insertEntry(comps[i], docid, i)
# Add terminator
self.insertEntry(None, docid, len(comps)-1)
self._unindex[docid] = path self._unindex[docid] = path
return 1 return 1
...@@ -124,15 +201,17 @@ class PathIndex(Persistent, SimpleItem): ...@@ -124,15 +201,17 @@ class PathIndex(Persistent, SimpleItem):
""" hook for (Z)Catalog """ """ hook for (Z)Catalog """
if not self._unindex.has_key(docid): if not self._unindex.has_key(docid):
LOG.error('Attempt to unindex nonexistent document with id %s' LOG.error('Attempt to unindex nonexistent document'
% docid) ' with id %s' % docid)
return return
comps = self._unindex[docid].split('/') # There is an assumption that paths start with /
path = self._unindex[docid]
for level in range(len(comps[1:])): if not path.startswith('/'):
comp = comps[level+1] path = '/'+path
comps = path.split('/')
def unindex(comp, level, docid=docid):
try: try:
self._index[comp][level].remove(docid) self._index[comp][level].remove(docid)
...@@ -142,13 +221,26 @@ class PathIndex(Persistent, SimpleItem): ...@@ -142,13 +221,26 @@ class PathIndex(Persistent, SimpleItem):
if not self._index[comp]: if not self._index[comp]:
del self._index[comp] del self._index[comp]
except KeyError: except KeyError:
LOG.error('Attempt to unindex document with id %s failed' LOG.error('Attempt to unindex document'
% docid) ' with id %s failed' % docid)
return
for level in range(len(comps[1:])):
comp = comps[level+1]
unindex(comp, level)
# Remove the terminator
level = len(comps[1:])
comp = None
unindex(comp, level-1)
if hasattr(self, '_migrate_length'):
self._migrate_length()
self._length.change(-1) self._length.change(-1)
del self._unindex[docid] del self._unindex[docid]
def search(self, path, default_level=0): def search(self, path, default_level=0, depth=-1, navtree=0):
""" """
path is either a string representing a path is either a string representing a
relative URL or a part of a relative URL or relative URL or a part of a relative URL or
...@@ -158,29 +250,58 @@ class PathIndex(Persistent, SimpleItem): ...@@ -158,29 +250,58 @@ class PathIndex(Persistent, SimpleItem):
level < 0 not implemented yet level < 0 not implemented yet
""" """
if isinstance(path, StringType): if isinstance(path, str):
level = default_level startlevel = default_level
else: else:
level = int(path[1]) startlevel = int(path[1])
path = path[0] path = path[0]
comps = filter(None, path.split('/')) comps = filter(None, path.split('/'))
if len(comps) == 0: # Make sure that we get depth = 1 if in navtree mode
return IISet(self._unindex.keys()) # unless specified otherwise
if level >= 0: if depth == -1:
results = [] depth = 0 or navtree
for i in range(len(comps)):
comp = comps[i]
if not self._index.has_key(comp): return IISet()
if not self._index[comp].has_key(level+i): return IISet()
results.append( self._index[comp][level+i] )
res = results[0] if len(comps) == 0:
for i in range(1,len(results)): if not depth and not navtree:
res = intersection(res,results[i]) return IISet(self._unindex.keys())
return res
if startlevel >= 0:
pathset = None # Same as pathindex
navset = None # For collecting siblings along the way
depthset = None # For limiting depth
if navtree and depth and \
self._index.has_key(None) and \
self._index[None].has_key(startlevel):
navset = self._index[None][startlevel]
for level in range(startlevel, startlevel+len(comps) + depth):
if level-startlevel < len(comps):
comp = comps[level-startlevel]
if not self._index.has_key(comp) or not self._index[comp].has_key(level):
# Navtree is inverse, keep going even for nonexisting paths
if navtree:
pathset = IISet()
else:
return IISet()
else:
pathset = intersection(pathset, self._index[comp][level])
if navtree and depth and \
self._index.has_key(None) and \
self._index[None].has_key(level+depth):
navset = union(navset, intersection(pathset, self._index[None][level+depth]))
if level-startlevel >= len(comps) or navtree:
if self._index.has_key(None) and self._index[None].has_key(level):
depthset = union(depthset, intersection(pathset, self._index[None][level]))
if navtree:
return union(depthset, navset) or IISet()
else:
return intersection(pathset,depthset) or IISet()
else: else:
results = IISet() results = IISet()
...@@ -197,17 +318,6 @@ class PathIndex(Persistent, SimpleItem): ...@@ -197,17 +318,6 @@ class PathIndex(Persistent, SimpleItem):
results = union(results,ids) results = union(results,ids)
return results return results
def numObjects(self):
""" return the number distinct values """
return len(self._unindex)
def indexSize(self):
""" return the number of indexed objects"""
return len(self)
def __len__(self):
return self._length()
def _apply_index(self, request, cid=''): def _apply_index(self, request, cid=''):
""" hook for (Z)Catalog """ hook for (Z)Catalog
'request' -- mapping type (usually {"path": "..." } 'request' -- mapping type (usually {"path": "..." }
...@@ -222,6 +332,8 @@ class PathIndex(Persistent, SimpleItem): ...@@ -222,6 +332,8 @@ class PathIndex(Persistent, SimpleItem):
level = record.get("level",0) level = record.get("level",0)
operator = record.get('operator',self.useOperator).lower() operator = record.get('operator',self.useOperator).lower()
depth = getattr(record, 'depth',-1) # Set to 0 or navtree in search - use getattr to get 0 value
navtree = record.get('navtree',0)
# depending on the operator we use intersection of union # depending on the operator we use intersection of union
if operator == "or": set_func = union if operator == "or": set_func = union
...@@ -229,7 +341,7 @@ class PathIndex(Persistent, SimpleItem): ...@@ -229,7 +341,7 @@ class PathIndex(Persistent, SimpleItem):
res = None res = None
for k in record.keys: for k in record.keys:
rows = self.search(k,level) rows = self.search(k,level, depth, navtree)
res = set_func(res,rows) res = set_func(res,rows)
if res: if res:
...@@ -237,32 +349,21 @@ class PathIndex(Persistent, SimpleItem): ...@@ -237,32 +349,21 @@ class PathIndex(Persistent, SimpleItem):
else: else:
return IISet(), (self.id,) return IISet(), (self.id,)
def hasUniqueValuesFor(self, name):
"""has unique values for column name"""
return name == self.id
def uniqueValues(self, name=None, withLength=0):
""" needed to be consistent with the interface """
return self._index.keys()
def getIndexSourceNames(self): def getIndexSourceNames(self):
""" return names of indexed attributes """ """ return names of indexed attributes """
return ('getPhysicalPath', )
def getEntryForObject(self, docid, default=_marker): # By default PathIndex advertises getPhysicalPath even
""" Takes a document ID and returns all the information # though the logic in index_object is different.
we have on that specific object.
"""
try: try:
return self._unindex[docid] return tuple(self.indexed_attrs)
except KeyError: except AttributeError:
# XXX Why is default ignored? return ('getPhysicalPath',)
return None
index_html = DTMLFile('dtml/index', globals()) index_html = DTMLFile('dtml/index', globals())
manage_workspace = DTMLFile('dtml/managePathIndex', globals()) manage_workspace = DTMLFile('dtml/managePathIndex', globals())
manage_addPathIndexForm = DTMLFile('dtml/addPathIndex', globals()) manage_addPathIndexForm = DTMLFile('dtml/addPathIndex', globals())
def manage_addPathIndex(self, id, REQUEST=None, RESPONSE=None, URL3=None): def manage_addPathIndex(self, id, REQUEST=None, RESPONSE=None, URL3=None):
......
PathIndex by Zope Corporation +
extensions by Plone Solutions (former ExtendedPathIndex)
This is an index that supports depth limiting, and the ability to build a
structure usable for navtrees and sitemaps. The actual navtree implementations
are not (and should not) be in this Product, this is the index implementation
only.
Features
- Can construct a site map with a single catalog query
- Can construct a navigation tree with a single catalog query
Usage:
- catalog(path='some/path') - search for all objects below some/path
- catalog(path={'query' : 'some/path', 'depth' : 2 ) - search for all
objects below some/path but only down to a depth of 2
- catalog(path={'query' : 'some/path', 'navtree' : 1 ) - search for all
objects below some/path for rendering a navigation tree. This includes
all objects below some/path up to a depth of 1 and all parent objects.
Credits
- Zope Corporation for the initial PathIndex code
- Helge Tesdal from Plone Solutions for the ExtendedPathIndex implementation
License
This software is released under the ZPL license.
from Testing import ZopeTestCase
from Products.PluginIndexes.PathIndex.PathIndex import PathIndex
class Dummy:
meta_type="foo"
def __init__(self, path):
self.path = path
def getPhysicalPath(self):
return self.path.split('/')
def __str__(self):
return '<Dummy: %s>' % self.path
__repr__ = __str__
class PathIndexTestCase(ZopeTestCase.ZopeTestCase):
def _setup(self):
self._index = PathIndex( 'path' )
self._values = {
1 : Dummy("/aa/aa/aa/1.html"),
2 : Dummy("/aa/aa/bb/2.html"),
3 : Dummy("/aa/aa/cc/3.html"),
4 : Dummy("/aa/bb/aa/4.html"),
5 : Dummy("/aa/bb/bb/5.html"),
6 : Dummy("/aa/bb/cc/6.html"),
7 : Dummy("/aa/cc/aa/7.html"),
8 : Dummy("/aa/cc/bb/8.html"),
9 : Dummy("/aa/cc/cc/9.html"),
10 : Dummy("/bb/aa/aa/10.html"),
11 : Dummy("/bb/aa/bb/11.html"),
12 : Dummy("/bb/aa/cc/12.html"),
13 : Dummy("/bb/bb/aa/13.html"),
14 : Dummy("/bb/bb/bb/14.html"),
15 : Dummy("/bb/bb/cc/15.html"),
16 : Dummy("/bb/cc/aa/16.html"),
17 : Dummy("/bb/cc/bb/17.html"),
18 : Dummy("/bb/cc/cc/18.html")
}
def _populateIndex(self):
for k, v in self._values.items():
self._index.index_object( k, v )
class ExtendedPathIndexTestCase(PathIndexTestCase):
def _setup(self):
self._index = PathIndex( 'path' )
self._values = {
1 : Dummy("/1.html"),
2 : Dummy("/aa/2.html"),
3 : Dummy("/aa/aa/3.html"),
4 : Dummy("/aa/aa/aa/4.html"),
5 : Dummy("/aa/bb/5.html"),
6 : Dummy("/aa/bb/aa/6.html"),
7 : Dummy("/aa/bb/bb/7.html"),
8 : Dummy("/aa"),
9 : Dummy("/aa/bb"),
10 : Dummy("/bb/10.html"),
11 : Dummy("/bb/bb/11.html"),
12 : Dummy("/bb/bb/bb/12.html"),
13 : Dummy("/bb/aa/13.html"),
14 : Dummy("/bb/aa/aa/14.html"),
15 : Dummy("/bb/bb/aa/15.html"),
16 : Dummy("/bb"),
17 : Dummy("/bb/bb"),
18 : Dummy("/bb/aa")
}
#
# IndexedAttrs tests
#
import os, sys
if __name__ == '__main__':
execfile(os.path.join(sys.path[0], 'framework.py'))
from Testing import ZopeTestCase
from Products.ZCatalog.ZCatalog import ZCatalog
from OFS.SimpleItem import SimpleItem
class Record:
def __init__(self, **kw):
self.__dict__.update(kw)
class Dummy(SimpleItem):
def __init__(self, id):
self.id = id
def getCustomPath(self):
return ('', 'custom', 'path')
def getStringPath(self):
return '/string/path'
class TestIndexedAttrs(ZopeTestCase.ZopeTestCase):
def afterSetUp(self):
self.catalog = ZCatalog('catalog')
self.folder._setObject('dummy', Dummy('dummy'))
self.dummy = self.folder.dummy
self.physical_path = '/'.join(self.dummy.getPhysicalPath())
self.custom_path = '/'.join(self.dummy.getCustomPath())
self.string_path = self.dummy.getStringPath()
def addIndex(self, id='path', extra=None):
self.catalog.addIndex(id, 'PathIndex', extra)
return self.catalog.Indexes[id]
def testAddIndex(self):
self.catalog.addIndex('path', 'PathIndex')
try:
self.catalog.Indexes['path']
except KeyError:
self.fail('Failed to create index')
def testDefaultIndexedAttrs(self):
# By default we don't have indexed_attrs at all
idx = self.addIndex()
self.failIf(hasattr(idx, 'indexed_attrs'))
def testDefaultIndexSourceNames(self):
# However, getIndexSourceName returns 'getPhysicalPath'
idx = self.addIndex()
self.assertEqual(idx.getIndexSourceNames(), ('getPhysicalPath',))
def testDefaultIndexObject(self):
# By default PathIndex indexes getPhysicalPath
idx = self.addIndex()
idx.index_object(123, self.dummy)
self.assertEqual(idx.getEntryForObject(123), self.physical_path)
def testDefaultSearchObject(self):
# We can find the object in the catalog by physical path
self.addIndex()
self.catalog.catalog_object(self.dummy)
self.assertEqual(len(self.catalog(path=self.physical_path)), 1)
def testDefaultSearchDictSyntax(self):
# PathIndex supports dictionary syntax for queries
self.addIndex()
self.catalog.catalog_object(self.dummy)
self.assertEqual(len(self.catalog(path={'query': self.physical_path})), 1)
def testExtraAsRecord(self):
# 'extra' can be a record type object
idx = self.addIndex(extra=Record(indexed_attrs='getCustomPath'))
self.assertEqual(idx.indexed_attrs, ('getCustomPath',))
def testExtraAsMapping(self):
# or a dictionary
idx = self.addIndex(extra={'indexed_attrs': 'getCustomPath'})
self.assertEqual(idx.indexed_attrs, ('getCustomPath',))
def testCustomIndexSourceNames(self):
# getIndexSourceName returns the indexed_attrs
idx = self.addIndex(extra={'indexed_attrs': 'getCustomPath'})
self.assertEqual(idx.getIndexSourceNames(), ('getCustomPath',))
def testCustomIndexObject(self):
# PathIndex indexes getCustomPath
idx = self.addIndex(extra={'indexed_attrs': 'getCustomPath'})
idx.index_object(123, self.dummy)
self.assertEqual(idx.getEntryForObject(123), self.custom_path)
def testCustomSearchObject(self):
# We can find the object in the catalog by custom path
self.addIndex(extra={'indexed_attrs': 'getCustomPath'})
self.catalog.catalog_object(self.dummy)
self.assertEqual(len(self.catalog(path=self.custom_path)), 1)
def testStringIndexObject(self):
# PathIndex accepts a path as tuple or string
idx = self.addIndex(extra={'indexed_attrs': 'getStringPath'})
idx.index_object(123, self.dummy)
self.assertEqual(idx.getEntryForObject(123), self.string_path)
def testStringSearchObject(self):
# And we can find the object in the catalog again
self.addIndex(extra={'indexed_attrs': 'getStringPath'})
self.catalog.catalog_object(self.dummy)
self.assertEqual(len(self.catalog(path=self.string_path)), 1)
def testIdIndexObject(self):
# PathIndex prefers an attribute matching its id over getPhysicalPath
idx = self.addIndex(id='getId')
idx.index_object(123, self.dummy)
self.assertEqual(idx.getEntryForObject(123), 'dummy')
def testIdIndexObject(self):
# Using indexed_attr overrides this behavior
idx = self.addIndex(id='getId', extra={'indexed_attrs': 'getCustomPath'})
idx.index_object(123, self.dummy)
self.assertEqual(idx.getEntryForObject(123), self.custom_path)
def testListIndexedAttr(self):
# indexed_attrs can be a list
idx = self.addIndex(id='getId', extra={'indexed_attrs': ['getCustomPath', 'foo']})
# only the first attribute is used
self.assertEqual(idx.getIndexSourceNames(), ('getCustomPath',))
def testStringIndexedAttr(self):
# indexed_attrs can also be a comma separated string
idx = self.addIndex(id='getId', extra={'indexed_attrs': 'getCustomPath, foo'})
# only the first attribute is used
self.assertEqual(idx.getIndexSourceNames(), ('getCustomPath',))
def testEmtpyListAttr(self):
# Empty indexed_attrs falls back to defaults
idx = self.addIndex(extra={'indexed_attrs': []})
self.assertEqual(idx.getIndexSourceNames(), ('getPhysicalPath',))
def testEmtpyStringAttr(self):
# Empty indexed_attrs falls back to defaults
idx = self.addIndex(extra={'indexed_attrs': ''})
self.assertEqual(idx.getIndexSourceNames(), ('getPhysicalPath',))
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestIndexedAttrs))
return suite
if __name__ == '__main__':
framework()
# Copyright (c) 2004 Zope Corporation and Plone Solutions
# BSD license
import os, sys
if __name__ == '__main__':
execfile(os.path.join(sys.path[0], 'framework.py'))
from Products.PluginIndexes.PathIndex.tests import epitc
class TestPathIndex(epitc.PathIndexTestCase):
""" Test ExtendedPathIndex objects """
def testEmpty(self):
self.assertEqual(self._index.numObjects() ,0)
self.assertEqual(self._index.getEntryForObject(1234), None)
self._index.unindex_object( 1234 ) # nothrow
self.assertEqual(self._index._apply_index({"suxpath": "xxx"}), None)
def testUnIndex(self):
self._populateIndex()
self.assertEqual(self._index.numObjects(), 18)
for k in self._values.keys():
self._index.unindex_object(k)
self.assertEqual(self._index.numObjects(), 0)
self.assertEqual(len(self._index._index), 0)
self.assertEqual(len(self._index._unindex), 0)
def testReindex(self):
self._populateIndex()
self.assertEqual(self._index.numObjects(), 18)
o = epitc.Dummy('/foo/bar')
self._index.index_object(19, o)
self.assertEqual(self._index.numObjects(), 19)
self._index.index_object(19, o)
self.assertEqual(self._index.numObjects(), 19)
def testUnIndexError(self):
self._populateIndex()
# this should not raise an error
self._index.unindex_object(-1)
# nor should this
self._index._unindex[1] = "/broken/thing"
self._index.unindex_object(1)
def testRoot_1(self):
self._populateIndex()
tests = ( ("/", 0, range(1,19)), )
for comp, level, results in tests:
for path in [comp, "/"+comp, "/"+comp+"/"]:
res = self._index._apply_index(
{"path": {'query': path, "level": level}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
for comp, level, results in tests:
for path in [comp, "/"+comp, "/"+comp+"/"]:
res = self._index._apply_index(
{"path": {'query': ((path, level),)}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
def testRoot_2(self):
self._populateIndex()
tests = ( ("/", 0, range(1,19)), )
for comp,level,results in tests:
for path in [comp, "/"+comp, "/"+comp+"/"]:
res = self._index._apply_index(
{"path": {'query': path, "level": level}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
for comp, level, results in tests:
for path in [comp, "/"+comp, "/"+comp+"/"]:
res = self._index._apply_index(
{"path": {'query': ((path, level),)}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
def testSimpleTests(self):
self._populateIndex()
tests = [
("aa", 0, [1,2,3,4,5,6,7,8,9]),
("aa", 1, [1,2,3,10,11,12] ),
("bb", 0, [10,11,12,13,14,15,16,17,18]),
("bb", 1, [4,5,6,13,14,15]),
("bb/cc", 0, [16,17,18]),
("bb/cc", 1, [6,15]),
("bb/aa", 0, [10,11,12]),
("bb/aa", 1, [4,13]),
("aa/cc", -1, [3,7,8,9,12]),
("bb/bb", -1, [5,13,14,15]),
("18.html", 3, [18]),
("18.html", -1, [18]),
("cc/18.html", -1, [18]),
("cc/18.html", 2, [18]),
]
for comp, level, results in tests:
for path in [comp, "/"+comp, "/"+comp+"/"]:
res = self._index._apply_index(
{"path": {'query': path, "level": level}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
for comp, level, results in tests:
for path in [comp, "/"+comp, "/"+comp+"/"]:
res = self._index._apply_index(
{"path": {'query': ((path, level),)}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
def testComplexOrTests(self):
self._populateIndex()
tests = [
(['aa','bb'], 1, [1,2,3,4,5,6,10,11,12,13,14,15]),
(['aa','bb','xx'], 1, [1,2,3,4,5,6,10,11,12,13,14,15]),
([('cc',1), ('cc',2)], 0, [3,6,7,8,9,12,15,16,17,18]),
]
for lst, level, results in tests:
res = self._index._apply_index(
{"path": {'query': lst, "level": level, "operator": "or"}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
def testComplexANDTests(self):
self._populateIndex()
tests = [
(['aa','bb'], 1, []),
([('aa',0), ('bb',1)], 0, [4,5,6]),
([('aa',0), ('cc',2)], 0, [3,6,9]),
]
for lst, level, results in tests:
res = self._index._apply_index(
{"path": {'query': lst, "level": level, "operator": "and"}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
class TestExtendedPathIndex(epitc.ExtendedPathIndexTestCase):
""" Test ExtendedPathIndex objects """
def testIndexIntegrity(self):
self._populateIndex()
index = self._index._index
self.assertEqual(list(index[None][0].keys()), [1,8,16])
self.assertEqual(list(index[None][1].keys()), [2,9,10,17,18])
self.assertEqual(list(index[None][2].keys()), [3,5,11,13])
self.assertEqual(list(index[None][3].keys()), [4,6,7,12,14,15])
def testUnIndexError(self):
self._populateIndex()
# this should not raise an error
self._index.unindex_object(-1)
# nor should this
self._index._unindex[1] = "/broken/thing"
self._index.unindex_object(1)
def testDepthLimit(self):
self._populateIndex()
tests = [
('/', 0, 1, 0, [1,8,16]),
('/', 0, 2, 0, [1,2,8,9,10,16,17,18]),
('/', 0, 3, 0, [1,2,3,5,8,9,10,11,13,16,17,18]),
]
for lst, level, depth, navtree, results in tests:
res = self._index._apply_index(
{"path": {'query': lst, "level": level, "depth": depth, "navtree": navtree}})
lst = list(res[0].keys())
self.assertEqual(lst, results)
def testDefaultNavtree(self):
self._populateIndex()
# depth = 1 by default when using navtree
tests = [
('/' ,0,1,1,[1,8,16]),
('/aa' ,0,1,1,[1,2,8,9,16]),
('/aa' ,1,1,1,[2,3,9,10,13,17,18]),
('/aa/aa' ,0,1,1,[1,2,3,8,9,16]),
('/aa/aa/aa',0,1,1,[1,2,3,4,8,9,16]),
('/aa/bb' ,0,1,1,[1,2,5,8,9,16]),
('/bb' ,0,1,1,[1,8,10,16,17,18]),
('/bb/aa' ,0,1,1,[1,8,10,13,16,17,18]),
('/bb/bb' ,0,1,1,[1,8,10,11,16,17,18]),
]
for lst, level, depth, navtree, results in tests:
res = self._index._apply_index(
{"path": {'query': lst, "level": level, "depth": depth, "navtree": navtree}})
lst = list(res[0].keys())
self.assertEqual(lst,results)
def testShallowNavtree(self):
self._populateIndex()
# With depth 0 we only get the parents
tests = [
('/' ,0,0,1,[]),
('/aa' ,0,0,1,[8]),
('/aa' ,1,0,1,[18]),
('/aa/aa' ,0,0,1,[8]),
('/aa/aa/aa',0,0,1,[8]),
('/aa/bb' ,0,0,1,[8,9]),
('/bb' ,0,0,1,[16]),
('/bb/aa' ,0,0,1,[16,18]),
('/bb/bb' ,0,0,1,[16,17]),
('/bb/bb/aa' ,0,0,1,[16,17]),
]
for lst, level, depth, navtree, results in tests:
res = self._index._apply_index(
{"path": {'query': lst, "level": level, "depth": depth, "navtree": navtree}})
lst = list(res[0].keys())
self.assertEqual(lst,results)
def testNonexistingPaths(self):
self._populateIndex()
# With depth 0 we only get the parents
# When getting non existing paths,
# we should get as many parents as possible when building navtree
tests = [
('/' ,0,0,1,[]),
('/aa' ,0,0,1,[8]), # Exists
('/aa/x' ,0,0,1,[8]), # Doesn't exist
('/aa' ,1,0,1,[18]),
('/aa/x' ,1,0,1,[18]),
('/aa/aa' ,0,0,1,[8]),
('/aa/aa/x' ,0,0,1,[8]),
('/aa/bb' ,0,0,1,[8,9]),
('/aa/bb/x' ,0,0,1,[8,9]),
]
for lst, level, depth, navtree, results in tests:
res = self._index._apply_index(
{"path": {'query': lst, "level": level, "depth": depth, "navtree": navtree}})
lst = list(res[0].keys())
self.assertEqual(lst,results)
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestPathIndex))
suite.addTest(makeSuite(TestExtendedPathIndex))
return suite
if __name__ == '__main__':
framework()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment