Commit ed87506a authored by Hanno Schlichting's avatar Hanno Schlichting

flake8

parent 036d773a
......@@ -27,58 +27,65 @@ from DateTime.DateTime import DateTime
from ExtensionClass import Base
from zExceptions import Redirect
class TemporalParadox(Exception):
pass
class HistorySelectionError(Exception):
pass
class HystoryJar:
"""A ZODB Connection-like object that provides access to data
but prevents history from being changed."""
def __init__(self, base):
self.__base__=base
self.__base__ = base
def __getattr__(self, name):
return getattr(self.__base__, name)
def commit(self, object, transaction):
if object._p_changed:
raise TemporalParadox, "You can't change history!"
raise TemporalParadox("You can't change history!")
def abort(*args, **kw): pass
def abort(*args, **kw):
pass
tpc_begin = tpc_finish = abort
def historicalRevision(self, serial):
state=self._p_jar.oldstate(self, serial)
rev=self.__class__.__basicnew__()
rev._p_jar=HystoryJar(self._p_jar)
rev._p_oid=self._p_oid
rev._p_serial=serial
state = self._p_jar.oldstate(self, serial)
rev = self.__class__.__basicnew__()
rev._p_jar = HystoryJar(self._p_jar)
rev._p_oid = self._p_oid
rev._p_serial = serial
rev.__setstate__(state)
rev._p_changed=0
rev._p_changed = 0
return rev
class Historian(Implicit):
"""An Historian's job is to find hysterical revisions of
objects, given a time."""
def __getitem__(self, key):
self=self.aq_parent
serial=apply(pack, ('>HHHH',)+tuple(map(int, key.split('.'))))
self = self.aq_parent
if serial == self._p_serial: return self
serial = pack(*('>HHHH',) + tuple(map(int, key.split('.'))))
if serial == self._p_serial:
return self
rev=historicalRevision(self, serial)
rev = historicalRevision(self, serial)
return rev.__of__(self.aq_parent)
def manage_workspace(self, REQUEST):
"We aren't real, so we delegate to that that spawned us!"
raise Redirect, REQUEST['URL2']+'/manage_change_history_page'
raise Redirect(REQUEST['URL2'] + '/manage_change_history_page')
class Historical(Base):
"""Mix-in class to provide a veiw that shows hystorical changes
......@@ -94,11 +101,11 @@ class Historical(Base):
security = ClassSecurityInfo()
HistoricalRevisions=Historian()
HistoricalRevisions = Historian()
manage_options=(
{'label':'History', 'action':'manage_change_history_page'},
)
manage_options = (
{'label': 'History', 'action': 'manage_change_history_page'},
)
security.declareProtected(view_history, 'manage_change_history_page')
manage_change_history_page = DTMLFile(
......@@ -108,63 +115,65 @@ class Historical(Base):
security.declareProtected(view_history, 'manage_change_history')
def manage_change_history(self):
first=0
last=20
request=getattr(self, 'REQUEST', None)
first = 0
last = 20
request = getattr(self, 'REQUEST', None)
if request is not None:
first=request.get('first_transaction', first)
last=request.get('last_transaction',last)
first = request.get('first_transaction', first)
last = request.get('last_transaction', last)
r=self._p_jar.db().history(self._p_oid, size=last)
r = self._p_jar.db().history(self._p_oid, size=last)
if r is None:
# storage doesn't support history
return ()
r=r[first:]
r = r[first:]
for d in r:
d['time']=DateTime(d['time'])
d['key']='.'.join(map(str, unpack(">HHHH", d['tid'])))
d['time'] = DateTime(d['time'])
d['key'] = '.'.join(map(str, unpack(">HHHH", d['tid'])))
return r
def manage_beforeHistoryCopy(self): pass # ? (Hook)
def manage_beforeHistoryCopy(self):
pass # ? (Hook)
def manage_historyCopy(self, keys=[], RESPONSE=None, URL1=None):
"Copy a selected revision to the present"
if not keys:
raise HistorySelectionError, (
raise HistorySelectionError(
"No historical revision was selected.<p>")
if len(keys) > 1:
raise HistorySelectionError, (
raise HistorySelectionError(
"Only one historical revision can be "
"copied to the present.<p>")
key=keys[0]
serial=apply(pack, ('>HHHH',)+tuple(map(int, key.split('.'))))
key = keys[0]
serial = pack(*('>HHHH',) + tuple(map(int, key.split('.'))))
if serial != self._p_serial:
self.manage_beforeHistoryCopy()
state=self._p_jar.oldstate(self, serial)
state = self._p_jar.oldstate(self, serial)
base = aq_base(self)
base._p_activate() # make sure we're not a ghost
base.__setstate__(state) # change the state
base._p_changed = True # marke object as dirty
base._p_activate() # make sure we're not a ghost
base.__setstate__(state) # change the state
base._p_changed = True # mark object as dirty
self.manage_afterHistoryCopy()
if RESPONSE is not None and URL1 is not None:
RESPONSE.redirect(URL1+'/manage_workspace')
def manage_afterHistoryCopy(self): pass # ? (Hook)
if RESPONSE is not None and URL1 is not None:
RESPONSE.redirect(URL1 + '/manage_workspace')
def manage_afterHistoryCopy(self):
pass # ? (Hook)
_manage_historyComparePage = DTMLFile(
'dtml/historyCompare', globals(), management_view='History')
security.declareProtected(view_history, 'manage_historyCompare')
def manage_historyCompare(self, rev1, rev2, REQUEST,
historyComparisonResults=''):
dt1=DateTime(rev1._p_mtime)
dt2=DateTime(rev2._p_mtime)
dt1 = DateTime(rev1._p_mtime)
dt2 = DateTime(rev2._p_mtime)
return self._manage_historyComparePage(
self, REQUEST,
dt1=dt1, dt2=dt2,
......@@ -174,68 +183,68 @@ class Historical(Base):
def manage_historicalComparison(self, REQUEST, keys=[]):
"Compare two selected revisions"
if not keys:
raise HistorySelectionError, (
raise HistorySelectionError(
"No historical revision was selected.<p>")
if len(keys) > 2:
raise HistorySelectionError, (
raise HistorySelectionError(
"Only two historical revision can be compared<p>")
serial=apply(pack, ('>HHHH',)+tuple(map(int, keys[-1].split('.'))))
rev1=historicalRevision(self, serial)
serial = pack(*('>HHHH',) + tuple(map(int, keys[-1].split('.'))))
rev1 = historicalRevision(self, serial)
if len(keys)==2:
serial=apply(pack,
('>HHHH',)+tuple(map(int, keys[0].split('.'))))
if len(keys) == 2:
serial = pack(*('>HHHH',) + tuple(map(int, keys[0].split('.'))))
rev2=historicalRevision(self, serial)
rev2 = historicalRevision(self, serial)
else:
rev2=self
rev2 = self
return self.manage_historyCompare(rev1, rev2, REQUEST)
InitializeClass(Historical)
def dump(tag, x, lo, hi, r):
r1=[]
r2=[]
for i in xrange(lo, hi):
r1 = []
r2 = []
for i in range(lo, hi):
r1.append(tag)
r2.append(x[i])
r.append("<tr>\n"
"<td><pre>\n%s\n</pre></td>\n"
"<td><pre>\n%s\n</pre></td>\n"
"</tr>\n"
% ('\n'.join(r1), escape('\n'.join(r2))))
"<td><pre>\n%s\n</pre></td>\n"
"<td><pre>\n%s\n</pre></td>\n"
"</tr>\n"
% ('\n'.join(r1), escape('\n'.join(r2))))
def replace(x, xlo, xhi, y, ylo, yhi, r):
rx1=[]
rx2=[]
for i in xrange(xlo, xhi):
def replace(x, xlo, xhi, y, ylo, yhi, r):
rx1 = []
rx2 = []
for i in range(xlo, xhi):
rx1.append('-')
rx2.append(x[i])
ry1=[]
ry2=[]
for i in xrange(ylo, yhi):
ry1 = []
ry2 = []
for i in range(ylo, yhi):
ry1.append('+')
ry2.append(y[i])
r.append("<tr>\n"
"<td><pre>\n%s\n%s\n</pre></td>\n"
"<td><pre>\n%s\n%s\n</pre></td>\n"
"</tr>\n"
% ('\n'.join(rx1), '\n'.join(ry1),
escape('\n'.join(rx2)), escape('\n'.join(ry2))))
"<td><pre>\n%s\n%s\n</pre></td>\n"
"<td><pre>\n%s\n%s\n</pre></td>\n"
"</tr>\n"
% ('\n'.join(rx1), '\n'.join(ry1),
escape('\n'.join(rx2)), escape('\n'.join(ry2))))
def html_diff(s1, s2):
a=s1.split('\n')
b=s2.split('\n')
cruncher=difflib.SequenceMatcher()
cruncher.set_seqs(a,b)
a = s1.split('\n')
b = s2.split('\n')
cruncher = difflib.SequenceMatcher()
cruncher.set_seqs(a, b)
r=['<table border=1>']
r = ['<table border=1>']
for tag, alo, ahi, blo, bhi in cruncher.get_opcodes():
if tag == 'replace':
replace(a, alo, ahi, b, blo, bhi, r)
......@@ -246,7 +255,7 @@ def html_diff(s1, s2):
elif tag == 'equal':
dump(' ', a, alo, ahi, r)
else:
raise ValueError, 'unknown tag ' + `tag`
raise ValueError('unknown tag %r' % tag)
r.append('</table>')
return '\n'.join(r)
......@@ -95,20 +95,21 @@ See module comments for details and programmatic interface.
# is sent to stdout. Or you can call main(args), passing what would
# have been in sys.argv[1:] had the cmd-line form been used.
import re
TRACE = 0
# define what "junk" means
import re
def IS_LINE_JUNK(line, pat=re.compile(r"\s*#?\s*$").match):
return pat(line) is not None
def IS_CHARACTER_JUNK(ch, ws=" \t"):
return ch in ws
del re
class SequenceMatcher:
class SequenceMatcher(object):
def __init__(self, isjunk=None, a='', b=''):
# Members:
# a
......@@ -277,9 +278,9 @@ class SequenceMatcher:
continue
if j >= bhi:
break
k = newj2len[j] = j2lenget(j-1, 0) + 1
k = newj2len[j] = j2lenget(j - 1, 0) + 1
if k > bestsize:
besti, bestj, bestsize = i-k+1, j-k+1, k
besti, bestj, bestsize = i - k + 1, j - k + 1, k
j2len = newj2len
# Now that we have a wholly interesting match (albeit possibly
......@@ -290,17 +291,14 @@ class SequenceMatcher:
# interesting match, this is clearly the right thing to do,
# because no other kind of match is possible in the regions.
while besti > alo and bestj > blo and \
isbjunk(b[bestj-1]) and \
a[besti-1] == b[bestj-1]:
besti, bestj, bestsize = besti-1, bestj-1, bestsize+1
while besti+bestsize < ahi and bestj+bestsize < bhi and \
isbjunk(b[bestj+bestsize]) and \
a[besti+bestsize] == b[bestj+bestsize]:
isbjunk(b[bestj - 1]) and \
a[besti - 1] == b[bestj - 1]:
besti, bestj, bestsize = besti - 1, bestj - 1, bestsize + 1
while besti + bestsize < ahi and bestj + bestsize < bhi and \
isbjunk(b[bestj + bestsize]) and \
a[besti + bestsize] == b[bestj + bestsize]:
bestsize = bestsize + 1
if TRACE:
print "get_matching_blocks", alo, ahi, blo, bhi
print " returns", besti, bestj, bestsize
return besti, bestj, bestsize
def get_matching_blocks(self):
......@@ -309,9 +307,7 @@ class SequenceMatcher:
self.matching_blocks = []
la, lb = len(self.a), len(self.b)
self.__helper(0, la, 0, lb, self.matching_blocks)
self.matching_blocks.append( (la, lb, 0) )
if TRACE:
print '*** matching blocks', self.matching_blocks
self.matching_blocks.append((la, lb, 0))
return self.matching_blocks
# builds list of matching blocks covering a[alo:ahi] and
......@@ -326,8 +322,8 @@ class SequenceMatcher:
if alo < i and blo < j:
self.__helper(alo, i, blo, j, answer)
answer.append(x)
if i+k < ahi and j+k < bhi:
self.__helper(i+k, ahi, j+k, bhi, answer)
if i + k < ahi and j + k < bhi:
self.__helper(i + k, ahi, j + k, bhi, answer)
def ratio(self):
"""Return a measure of the sequences' similarity (float in [0,1]).
......@@ -392,25 +388,29 @@ class SequenceMatcher:
elif j < bj:
tag = 'insert'
if tag:
answer.append( (tag, i, ai, j, bj) )
i, j = ai+size, bj+size
answer.append((tag, i, ai, j, bj))
i, j = ai + size, bj + size
# the list of matching blocks is terminated by a
# sentinel with size 0
if size:
answer.append( ('equal', ai, i, bj, j) )
answer.append(('equal', ai, i, bj, j))
return answer
# meant for dumping lines
def dump(tag, x, lo, hi):
# meant for dumping lines
for i in xrange(lo, hi):
print tag, x[i],
print(tag, x[i])
# figure out which mark to stick under characters in lines that
# have changed (blank = same, - = deleted, + = inserted, ^ = replaced)
_combine = { ' ': ' ',
'. ': '-',
' .': '+',
'..': '^' }
_combine = {
' ': ' ',
'. ': '-',
' .': '+',
'..': '^',
}
def plain_replace(a, alo, ahi, b, blo, bhi):
assert alo < ahi and blo < bhi
......@@ -428,12 +428,8 @@ def plain_replace(a, alo, ahi, b, blo, bhi):
# used as a synch point, and intraline difference marking is done on
# the similar pair. Lots of work, but often worth it.
def fancy_replace(a, alo, ahi, b, blo, bhi):
if TRACE:
print '*** fancy_replace', alo, ahi, blo, bhi
dump('>', a, alo, ahi)
dump('<', b, blo, bhi)
def fancy_replace(a, alo, ahi, b, blo, bhi):
# don't synch up unless the lines have a similarity score of at
# least cutoff; best_ratio tracks the best score seen so far
best_ratio, cutoff = 0.74, 0.75
......@@ -460,8 +456,8 @@ def fancy_replace(a, alo, ahi, b, blo, bhi):
# time it's called on a sequence pair; the expensive part
# of the computation is cached by cruncher
if cruncher.real_quick_ratio() > best_ratio and \
cruncher.quick_ratio() > best_ratio and \
cruncher.ratio() > best_ratio:
cruncher.quick_ratio() > best_ratio and \
cruncher.ratio() > best_ratio:
best_ratio, best_i, best_j = cruncher.ratio(), i, j
if best_ratio < cutoff:
# no non-identical "pretty close" pair
......@@ -475,13 +471,6 @@ def fancy_replace(a, alo, ahi, b, blo, bhi):
# there's a close pair, so forget the identical pair (if any)
eqi = None
# a[best_i] very similar to b[best_j]; eqi is None iff they're not
# identical
if TRACE:
print '*** best_ratio', best_ratio, best_i, best_j
dump('>', a, best_i, best_i+1)
dump('<', b, best_j, best_j+1)
# pump out diffs from before the synch point
fancy_helper(a, alo, best_i, b, blo, best_j)
......@@ -504,13 +493,13 @@ def fancy_replace(a, alo, ahi, b, blo, bhi):
atags = atags + ' ' * la
btags = btags + ' ' * lb
else:
raise ValueError, 'unknown tag ' + `tag`
raise ValueError('unknown tag %r' % tag)
la, lb = len(atags), len(btags)
if la < lb:
atags = atags + ' ' * (lb - la)
elif lb < la:
btags = btags + ' ' * (la - lb)
combined = map(lambda x,y: _combine[x+y], atags, btags)
combined = map(lambda x, y: _combine[x + y], atags, btags)
print '-', aelt, '+', belt, '?', \
''.join(combined).rstrip()
else:
......@@ -518,7 +507,8 @@ def fancy_replace(a, alo, ahi, b, blo, bhi):
print ' ', aelt,
# pump out diffs from after the synch point
fancy_helper(a, best_i+1, ahi, b, best_j+1, bhi)
fancy_helper(a, best_i + 1, ahi, b, best_j + 1, bhi)
def fancy_helper(a, alo, ahi, b, blo, bhi):
if alo < ahi:
......@@ -529,6 +519,7 @@ def fancy_helper(a, alo, ahi, b, blo, bhi):
elif blo < bhi:
dump('+', b, blo, bhi)
def fail(msg):
import sys
out = sys.stderr.write
......@@ -536,23 +527,27 @@ def fail(msg):
out(__doc__)
return 0
# open a file & return the file object; gripe and return 0 if it
# couldn't be opened
def fopen(fname):
# open a file & return the file object; gripe and return 0 if it
# couldn't be opened
try:
return open(fname, 'r')
except IOError, detail:
return fail("couldn't open " + fname + ": " + str(detail))
# open two files & spray the diff to stdout; return false iff a problem
def fcompare(f1name, f2name):
# open two files & spray the diff to stdout; return false iff a problem
f1 = fopen(f1name)
f2 = fopen(f2name)
if not f1 or not f2:
return 0
a = f1.readlines(); f1.close()
b = f2.readlines(); f2.close()
a = f1.readlines()
f1.close()
b = f2.readlines()
f2.close()
cruncher = SequenceMatcher(IS_LINE_JUNK, a, b)
for tag, alo, ahi, blo, bhi in cruncher.get_opcodes():
......@@ -565,62 +560,6 @@ def fcompare(f1name, f2name):
elif tag == 'equal':
dump(' ', a, alo, ahi)
else:
raise ValueError, 'unknown tag ' + `tag`
raise ValueError('unknown tag %r' + tag)
return 1
# crack args (sys.argv[1:] is normal) & compare;
# return false iff a problem
def main(args):
import getopt
try:
opts, args = getopt.getopt(args, "qr:")
except getopt.error, detail:
return fail(str(detail))
noisy = 1
qseen = rseen = 0
for opt, val in opts:
if opt == "-q":
qseen = 1
noisy = 0
elif opt == "-r":
rseen = 1
whichfile = val
if qseen and rseen:
return fail("can't specify both -q and -r")
if rseen:
if args:
return fail("no args allowed with -r option")
if whichfile in "12":
restore(whichfile)
return 1
return fail("-r value must be 1 or 2")
if len(args) != 2:
return fail("need 2 filename args")
f1name, f2name = args
if noisy:
print '-:', f1name
print '+:', f2name
return fcompare(f1name, f2name)
def restore(which):
import sys
tag = {"1": "- ", "2": "+ "}[which]
prefixes = (" ", tag)
for line in sys.stdin.readlines():
if line[:2] in prefixes:
print line[2:],
if __name__ == '__main__':
import sys
args = sys.argv[1:]
if "-profile" in args:
import profile, pstats
args.remove("-profile")
statf = "ndiff.pro"
profile.run("main(args)", statf)
stats = pstats.Stats(statf)
stats.strip_dirs().sort_stats('time').print_stats()
else:
main(args)
......@@ -10,12 +10,6 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests demonstrating consequences of guarded_getattr fix from 2004/08/07
http://mail.zope.org/pipermail/zope-checkins/2004-August/028152.html
http://zope.org/Collectors/CMF/259
"""
import unittest
......@@ -41,6 +35,7 @@ class AllowedItem(SimpleItem):
InitializeClass(AllowedItem)
class DeniedItem(SimpleItem):
id = 'denied'
security = ClassSecurityInfo()
......@@ -48,6 +43,7 @@ class DeniedItem(SimpleItem):
InitializeClass(DeniedItem)
class ProtectedItem(SimpleItem):
id = 'protected'
security = ClassSecurityInfo()
......@@ -74,7 +70,8 @@ class TestGetAttr(unittest.TestCase):
self.app.manage_addFolder('plain_folder')
# We also want to be able to acquire simple attributes
self.app.manage_addProperty(id='simple_type', type='string', value='a string')
self.app.manage_addProperty(
id='simple_type', type='string', value='a string')
# Set up a subfolder and the objects we want to acquire from
self.app.manage_addFolder('subfolder')
......@@ -142,7 +139,6 @@ class TestGetAttr(unittest.TestCase):
class TestGetAttrAnonymous(TestGetAttr):
# Run all tests again as Anonymous User
def setUp(self):
......@@ -151,23 +147,15 @@ class TestGetAttrAnonymous(TestGetAttr):
noSecurityManager()
class TestGetAttr_c(TestGetAttr):
class TestGetAttrC(TestGetAttr):
def setUp(self):
TestGetAttr.setUp(self)
self.guarded_getattr = guarded_getattr_c
class TestGetAttrAnonymous_c(TestGetAttrAnonymous):
class TestGetAttrAnonymousC(TestGetAttrAnonymous):
def setUp(self):
TestGetAttrAnonymous.setUp(self)
self.guarded_getattr = guarded_getattr_c
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestGetAttr))
suite.addTest(unittest.makeSuite(TestGetAttrAnonymous))
suite.addTest(unittest.makeSuite(TestGetAttr_c))
suite.addTest(unittest.makeSuite(TestGetAttrAnonymous_c))
return suite
......@@ -47,27 +47,27 @@ class ApplicationTests(unittest.TestCase):
self.assertEqual(app.title_and_id(), 'Other')
self.assertEqual(app.title_or_id(), 'Other')
def test___bobo_traverse__attribute_hit(self):
def test_bobo_traverse_attribute_hit(self):
app = self._makeOne()
app.NAME = 'attribute'
app._getOb = lambda x, y: x
request = {}
self.assertEqual(app.__bobo_traverse__(request, 'NAME'), 'attribute')
def test___bobo_traverse__attribute_miss_key_hit(self):
def test_bobo_traverse_attribute_miss_key_hit(self):
app = self._makeOne()
app._getOb = lambda x, y: x
app._objects = [{'id': 'OTHER', 'meta_type': None}]
request = {}
self.assertEqual(app.__bobo_traverse__(request, 'OTHER'), 'OTHER')
def test___bobo_traverse__attribute_key_miss_R_M_default_real_request(self):
def test_bobo_traverse_attribute_key_miss_R_M_default_real_request(self):
from UserDict import UserDict
request = UserDict()
class _Response:
def notFoundError(self, msg):
1/0
1 / 0
request.RESPONSE = _Response()
app = self._makeOne()
......@@ -76,21 +76,21 @@ class ApplicationTests(unittest.TestCase):
self.assertRaises(ZeroDivisionError,
app.__bobo_traverse__, request, 'NONESUCH')
def test___bobo_traverse__attribute_key_miss_R_M_default_fake_request(self):
def test_bobo_traverse_attribute_key_miss_R_M_default_fake_request(self):
app = self._makeOne()
app._getOb = _noWay
request = {}
self.assertRaises(KeyError, app.__bobo_traverse__, request, 'NONESUCH')
def test___bobo_traverse__attribute_key_miss_R_M_is_GET(self):
def test_bobo_traverse_attribute_key_miss_R_M_is_GET(self):
app = self._makeOne()
app._getOb = _noWay
request = {'REQUEST_METHOD': 'GET'}
self.assertRaises(KeyError, app.__bobo_traverse__, request, 'NONESUCH')
def test___bobo_traverse__attribute_key_miss_R_M_not_GET_POST(self):
def test_bobo_traverse_attribute_key_miss_R_M_not_GET_POST(self):
from OFS import bbb
if bbb.HAS_ZSERVER:
from webdav.NullResource import NullResource
......
......@@ -32,9 +32,3 @@ class CacheTests(unittest.TestCase):
# The parent_cache should still trigger managersExist
self.assertTrue(managersExist(root.child.child_content))
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(CacheTests))
return suite
......@@ -14,6 +14,7 @@
import unittest
from Testing import ZopeTestCase
class TestRecursiveChangeOwnership(ZopeTestCase.ZopeTestCase):
user_name2 = "dumdidum"
user_pass2 = "dumdidum"
......@@ -27,10 +28,10 @@ class TestRecursiveChangeOwnership(ZopeTestCase.ZopeTestCase):
# remember user objects
# is the __of__() call correct? is it needed? without it ownerInfo in
# owner.py throws an AttributeError ...
self.user1 = self.folder['acl_users'].getUser(ZopeTestCase.user_name
).__of__(self.folder)
self.user2 = self.folder['acl_users'].getUser(self.user_name2
).__of__(self.folder)
self.user1 = self.folder['acl_users'].getUser(
ZopeTestCase.user_name).__of__(self.folder)
self.user2 = self.folder['acl_users'].getUser(
self.user_name2).__of__(self.folder)
self.folder.changeOwnership(self.user1)
......
......@@ -15,57 +15,48 @@ from Testing.makerequest import makerequest
ADD_IMAGES_AND_FILES = 'Add images and files'
FILE_META_TYPES = ( { 'name' : 'File'
, 'action' : 'manage_addFile'
, 'permission' : ADD_IMAGES_AND_FILES
}
,
)
FILE_META_TYPES = ({
'name': 'File',
'action': 'manage_addFile',
'permission': ADD_IMAGES_AND_FILES
}, )
class UnitTestSecurityPolicy:
"""Stub out the existing security policy for unit testing purposes.
"""
Stub out the existing security policy for unit testing purposes.
"""
#
# Standard SecurityPolicy interface
#
def validate( self
, accessed=None
, container=None
, name=None
, value=None
, context=None
, roles=None
, *args
, **kw):
def validate(self, accessed=None, container=None, name=None, value=None,
context=None, roles=None, *args, **kw):
return 1
def checkPermission( self, permission, object, context) :
def checkPermission(self, permission, object, context):
return 1
class UnitTestUser( Implicit ):
"""
Stubbed out manager for unit testing purposes.
class UnitTestUser(Implicit):
"""Stubbed out manager for unit testing purposes.
"""
def getId( self ):
def getId(self):
return 'unit_tester'
getUserName = getId
def allowed( self, object, object_roles=None ):
def allowed(self, object, object_roles=None):
return 1
def makeConnection():
import ZODB
from ZODB.DemoStorage import DemoStorage
s = DemoStorage()
return ZODB.DB( s ).open()
return ZODB.DB(s).open()
class CopySupportTestBase(unittest.TestCase):
def _initFolders(self):
self.connection = makeConnection()
try:
r = self.connection.root()
......@@ -73,14 +64,13 @@ class CopySupportTestBase(unittest.TestCase):
r['Application'] = a
self.root = a
responseOut = self.responseOut = cStringIO.StringIO()
self.app = makerequest( self.root, stdout=responseOut )
manage_addFolder( self.app, 'folder1' )
manage_addFolder( self.app, 'folder2' )
folder1 = getattr( self.app, 'folder1' )
folder2 = getattr( self.app, 'folder2' )
self.app = makerequest(self.root, stdout=responseOut)
manage_addFolder(self.app, 'folder1')
manage_addFolder(self.app, 'folder2')
folder1 = getattr(self.app, 'folder1')
manage_addFile( folder1, 'file'
, file='', content_type='text/plain')
manage_addFile(
folder1, 'file', file='', content_type='text/plain')
# Hack, we need a _p_mtime for the file, so we make sure that it
# has one. We use a subtransaction, which means we can rollback
......@@ -91,10 +81,9 @@ class CopySupportTestBase(unittest.TestCase):
raise
transaction.begin()
return self.app._getOb( 'folder1' ), self.app._getOb( 'folder2' )
def _cleanApp( self ):
return self.app._getOb('folder1'), self.app._getOb('folder2')
def _cleanApp(self):
transaction.abort()
self.app._p_jar.sync()
self.connection.close()
......@@ -103,25 +92,23 @@ class CopySupportTestBase(unittest.TestCase):
del self.root
del self.connection
class TestCopySupport( CopySupportTestBase ):
def setUp( self ):
class TestCopySupport(CopySupportTestBase):
def setUp(self):
folder1, folder2 = self._initFolders()
folder1.all_meta_types = folder2.all_meta_types = FILE_META_TYPES
self.folder1 = folder1
self.folder2 = folder2
self.policy = UnitTestSecurityPolicy()
self.oldPolicy = SecurityManager.setSecurityPolicy( self.policy )
newSecurityManager( None, UnitTestUser().__of__( self.root ) )
def tearDown( self ):
self.oldPolicy = SecurityManager.setSecurityPolicy(self.policy)
newSecurityManager(None, UnitTestUser().__of__(self.root))
def tearDown(self):
noSecurityManager()
SecurityManager.setSecurityPolicy( self.oldPolicy )
SecurityManager.setSecurityPolicy(self.oldPolicy)
del self.oldPolicy
del self.policy
del self.folder2
......@@ -139,27 +126,27 @@ class TestCopySupport( CopySupportTestBase ):
verifyClass(ICopyContainer, CopyContainer)
verifyClass(ICopySource, CopySource)
def testRename( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.folder1.manage_renameObject( id='file', new_id='filex' )
self.assertFalse( 'file' in self.folder1.objectIds() )
self.assertTrue( 'filex' in self.folder1.objectIds() )
def testCopy( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertFalse( 'file' in self.folder2.objectIds() )
cookie = self.folder1.manage_copyObjects( ids=('file',) )
self.folder2.manage_pasteObjects( cookie )
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertTrue( 'file' in self.folder2.objectIds() )
def testCut( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertFalse( 'file' in self.folder2.objectIds() )
cookie = self.folder1.manage_cutObjects( ids=('file',) )
self.folder2.manage_pasteObjects( cookie )
self.assertFalse( 'file' in self.folder1.objectIds() )
self.assertTrue( 'file' in self.folder2.objectIds() )
def testRename(self):
self.assertTrue('file' in self.folder1.objectIds())
self.folder1.manage_renameObject(id='file', new_id='filex')
self.assertFalse('file' in self.folder1.objectIds())
self.assertTrue('filex' in self.folder1.objectIds())
def testCopy(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertFalse('file' in self.folder2.objectIds())
cookie = self.folder1.manage_copyObjects(ids=('file',))
self.folder2.manage_pasteObjects(cookie)
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue('file' in self.folder2.objectIds())
def testCut(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertFalse('file' in self.folder2.objectIds())
cookie = self.folder1.manage_cutObjects(ids=('file',))
self.folder2.manage_pasteObjects(cookie)
self.assertFalse('file' in self.folder1.objectIds())
self.assertTrue('file' in self.folder2.objectIds())
def testCopyNewObject(self):
self.assertFalse('newfile' in self.folder1.objectIds())
......@@ -170,54 +157,54 @@ class TestCopySupport( CopySupportTestBase ):
self.assertTrue('newfile' in self.folder1.objectIds())
self.assertTrue('newfile' in self.folder2.objectIds())
def testPasteSingleNotSameID( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertFalse( 'file' in self.folder2.objectIds() )
cookie = self.folder1.manage_copyObjects( ids=('file',) )
result = self.folder2.manage_pasteObjects( cookie )
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertTrue( 'file' in self.folder2.objectIds() )
self.assertTrue( result == [{'id':'file', 'new_id':'file'}])
def testPasteSingleSameID( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertFalse( 'file' in self.folder2.objectIds() )
def testPasteSingleNotSameID(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertFalse('file' in self.folder2.objectIds())
cookie = self.folder1.manage_copyObjects(ids=('file',))
result = self.folder2.manage_pasteObjects(cookie)
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue('file' in self.folder2.objectIds())
self.assertTrue(result == [{'id': 'file', 'new_id': 'file'}])
def testPasteSingleSameID(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertFalse('file' in self.folder2.objectIds())
manage_addFile(self.folder2, 'file',
file='', content_type='text/plain')
cookie = self.folder1.manage_copyObjects( ids=('file',) )
result = self.folder2.manage_pasteObjects( cookie )
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertTrue( 'file' in self.folder2.objectIds() )
self.assertTrue( 'copy_of_file' in self.folder2.objectIds() )
self.assertTrue( result == [{'id':'file', 'new_id':'copy_of_file'}])
cookie = self.folder1.manage_copyObjects(ids=('file',))
result = self.folder2.manage_pasteObjects(cookie)
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue('file' in self.folder2.objectIds())
self.assertTrue('copy_of_file' in self.folder2.objectIds())
self.assertTrue(result == [{'id': 'file', 'new_id': 'copy_of_file'}])
def testPasteSingleSameIDMultipleTimes(self):
cookie = self.folder1.manage_copyObjects(ids=('file',))
result = self.folder1.manage_pasteObjects(cookie)
self.assertEqual(self.folder1.objectIds(), ['file', 'copy_of_file'])
self.assertEqual(result, [{'id':'file', 'new_id':'copy_of_file'}])
self.assertEqual(result, [{'id': 'file', 'new_id': 'copy_of_file'}])
# make another copy of file
cookie = self.folder1.manage_copyObjects(ids=('file',))
result = self.folder1.manage_pasteObjects(cookie)
self.assertEqual(self.folder1.objectIds(),
['file', 'copy_of_file', 'copy2_of_file'])
self.assertEqual(result, [{'id':'file', 'new_id':'copy2_of_file'}])
self.assertEqual(result, [{'id': 'file', 'new_id': 'copy2_of_file'}])
# now copy the copy
cookie = self.folder1.manage_copyObjects(ids=('copy_of_file',))
result = self.folder1.manage_pasteObjects(cookie)
self.assertEqual(self.folder1.objectIds(),
['file', 'copy_of_file', 'copy2_of_file',
'copy3_of_file'])
self.assertEqual(result, [{'id':'copy_of_file',
'new_id':'copy3_of_file'}])
self.assertEqual(result, [{'id': 'copy_of_file',
'new_id': 'copy3_of_file'}])
# or copy another copy
cookie = self.folder1.manage_copyObjects(ids=('copy2_of_file',))
result = self.folder1.manage_pasteObjects(cookie)
self.assertEqual(self.folder1.objectIds(),
['file', 'copy_of_file', 'copy2_of_file',
'copy3_of_file', 'copy4_of_file'])
self.assertEqual(result, [{'id':'copy2_of_file',
'new_id':'copy4_of_file'}])
self.assertEqual(result, [{'id': 'copy2_of_file',
'new_id': 'copy4_of_file'}])
def testPasteSpecialName(self):
manage_addFile(self.folder1, 'copy_of_',
......@@ -226,142 +213,147 @@ class TestCopySupport( CopySupportTestBase ):
result = self.folder1.manage_pasteObjects(cookie)
self.assertEqual(self.folder1.objectIds(),
['file', 'copy_of_', 'copy2_of_'])
self.assertEqual(result, [{'id':'copy_of_', 'new_id':'copy2_of_'}])
self.assertEqual(result, [{'id': 'copy_of_', 'new_id': 'copy2_of_'}])
def testPasteMultiNotSameID( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertFalse( 'file1' in self.folder1.objectIds() )
def testPasteMultiNotSameID(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertFalse('file1' in self.folder1.objectIds())
manage_addFile(self.folder1, 'file1',
file='', content_type='text/plain')
self.assertFalse( 'file2' in self.folder1.objectIds() )
self.assertFalse('file2' in self.folder1.objectIds())
manage_addFile(self.folder1, 'file2',
file='', content_type='text/plain')
self.assertFalse( 'file' in self.folder2.objectIds() )
self.assertFalse( 'file1' in self.folder2.objectIds() )
self.assertFalse( 'file2' in self.folder2.objectIds() )
cookie = self.folder1.manage_copyObjects( ids=('file','file1','file2',) )
result = self.folder2.manage_pasteObjects( cookie )
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertTrue( 'file1' in self.folder1.objectIds() )
self.assertTrue( 'file2' in self.folder1.objectIds() )
self.assertTrue( 'file' in self.folder2.objectIds() )
self.assertTrue( 'file1' in self.folder2.objectIds() )
self.assertTrue( 'file2' in self.folder2.objectIds() )
self.assertTrue( result == [{'id':'file', 'new_id':'file'},
{'id':'file1', 'new_id':'file1'},
{'id':'file2', 'new_id':'file2'}])
def testPasteMultiSameID( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertFalse( 'file1' in self.folder1.objectIds() )
self.assertFalse('file' in self.folder2.objectIds())
self.assertFalse('file1' in self.folder2.objectIds())
self.assertFalse('file2' in self.folder2.objectIds())
cookie = self.folder1.manage_copyObjects(
ids=('file', 'file1', 'file2',))
result = self.folder2.manage_pasteObjects(cookie)
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue('file1' in self.folder1.objectIds())
self.assertTrue('file2' in self.folder1.objectIds())
self.assertTrue('file' in self.folder2.objectIds())
self.assertTrue('file1' in self.folder2.objectIds())
self.assertTrue('file2' in self.folder2.objectIds())
self.assertEqual(result, [
{'id': 'file', 'new_id': 'file'},
{'id': 'file1', 'new_id': 'file1'},
{'id': 'file2', 'new_id': 'file2'},
])
def testPasteMultiSameID(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertFalse('file1' in self.folder1.objectIds())
manage_addFile(self.folder1, 'file1',
file='', content_type='text/plain')
self.assertFalse( 'file2' in self.folder1.objectIds() )
self.assertFalse('file2' in self.folder1.objectIds())
manage_addFile(self.folder1, 'file2',
file='', content_type='text/plain')
self.assertFalse( 'file' in self.folder2.objectIds() )
self.assertFalse('file' in self.folder2.objectIds())
manage_addFile(self.folder2, 'file',
file='', content_type='text/plain')
self.assertFalse( 'file1' in self.folder2.objectIds() )
self.assertFalse('file1' in self.folder2.objectIds())
manage_addFile(self.folder2, 'file1',
file='', content_type='text/plain')
self.assertFalse( 'file2' in self.folder2.objectIds() )
self.assertFalse('file2' in self.folder2.objectIds())
manage_addFile(self.folder2, 'file2',
file='', content_type='text/plain')
cookie = self.folder1.manage_copyObjects( ids=('file','file1','file2',) )
result = self.folder2.manage_pasteObjects( cookie )
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertTrue( 'file1' in self.folder1.objectIds() )
self.assertTrue( 'file2' in self.folder1.objectIds() )
self.assertTrue( 'file' in self.folder2.objectIds() )
self.assertTrue( 'file1' in self.folder2.objectIds() )
self.assertTrue( 'file2' in self.folder2.objectIds() )
self.assertTrue( 'copy_of_file' in self.folder2.objectIds() )
self.assertTrue( 'copy_of_file1' in self.folder2.objectIds() )
self.assertTrue( 'copy_of_file2' in self.folder2.objectIds() )
self.assertTrue( result == [{'id':'file', 'new_id':'copy_of_file'},
{'id':'file1', 'new_id':'copy_of_file1'},
{'id':'file2', 'new_id':'copy_of_file2'}])
cookie = self.folder1.manage_copyObjects(
ids=('file', 'file1', 'file2',))
result = self.folder2.manage_pasteObjects(cookie)
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue('file1' in self.folder1.objectIds())
self.assertTrue('file2' in self.folder1.objectIds())
self.assertTrue('file' in self.folder2.objectIds())
self.assertTrue('file1' in self.folder2.objectIds())
self.assertTrue('file2' in self.folder2.objectIds())
self.assertTrue('copy_of_file' in self.folder2.objectIds())
self.assertTrue('copy_of_file1' in self.folder2.objectIds())
self.assertTrue('copy_of_file2' in self.folder2.objectIds())
self.assertEqual(result, [
{'id': 'file', 'new_id': 'copy_of_file'},
{'id': 'file1', 'new_id': 'copy_of_file1'},
{'id': 'file2', 'new_id': 'copy_of_file2'},
])
class _SensitiveSecurityPolicy:
def __init__( self, validate_lambda, checkPermission_lambda ):
self._lambdas = ( validate_lambda, checkPermission_lambda )
def __init__(self, validate_lambda, checkPermission_lambda):
self._lambdas = (validate_lambda, checkPermission_lambda)
def validate( self, *args, **kw ):
def validate(self, *args, **kw):
from zExceptions import Unauthorized
allowed = self._lambdas[ 0 ]( *args, **kw )
allowed = self._lambdas[0](*args, **kw)
if not allowed:
raise Unauthorized
return 1
def checkPermission( self, *args, **kw ) :
return self._lambdas[ 1 ]( *args, **kw )
def checkPermission(self, *args, **kw):
return self._lambdas[1](*args, **kw)
class _AllowedUser(UnitTestUser):
class _AllowedUser( UnitTestUser ):
def __init__(self, allowed_lambda):
self._lambdas = (allowed_lambda, )
def __init__( self, allowed_lambda ):
self._lambdas = ( allowed_lambda, )
def allowed(self, object, object_roles=None):
return self._lambdas[0](object, object_roles)
def allowed( self, object, object_roles=None ):
return self._lambdas[ 0 ]( object, object_roles )
class TestCopySupportSecurity( CopySupportTestBase ):
class TestCopySupportSecurity(CopySupportTestBase):
_old_policy = None
def setUp( self ):
def setUp(self):
self._scrubSecurity()
def tearDown( self ):
def tearDown(self):
self._scrubSecurity()
self._cleanApp()
def _scrubSecurity( self ):
def _scrubSecurity(self):
noSecurityManager()
if self._old_policy is not None:
SecurityManager.setSecurityPolicy( self._old_policy )
SecurityManager.setSecurityPolicy(self._old_policy)
def _assertCopyErrorUnauth( self, callable, *args, **kw ):
def _assertCopyErrorUnauth(self, callable, *args, **kw):
import re
from zExceptions import Unauthorized
from OFS.CopySupport import CopyError
ce_regex = kw.get( 'ce_regex' )
ce_regex = kw.get('ce_regex')
if ce_regex is not None:
del kw[ 'ce_regex' ]
del kw['ce_regex']
try:
callable( *args, **kw )
callable(*args, **kw)
except CopyError, e:
except CopyError as e:
if ce_regex is not None:
pattern = re.compile( ce_regex, re.DOTALL )
if pattern.search( e.args[0] ) is None:
self.fail( "Paste failed; didn't match pattern:\n%s" % e )
pattern = re.compile(ce_regex, re.DOTALL)
if pattern.search(e.args[0]) is None:
self.fail("Paste failed; didn't match pattern:\n%s" % e)
else:
self.fail( "Paste failed; no pattern:\n%s" % e )
self.fail("Paste failed; no pattern:\n%s" % e)
except Unauthorized, e:
except Unauthorized:
pass
else:
self.fail( "Paste allowed unexpectedly." )
def _initPolicyAndUser( self
, a_lambda=None
, v_lambda=None
, c_lambda=None
):
def _promiscuous( *args, **kw ):
self.fail("Paste allowed unexpectedly.")
def _initPolicyAndUser(self, a_lambda=None, v_lambda=None, c_lambda=None):
def _promiscuous(*args, **kw):
return 1
if a_lambda is None:
......@@ -373,155 +365,120 @@ class TestCopySupportSecurity( CopySupportTestBase ):
if c_lambda is None:
c_lambda = _promiscuous
scp = _SensitiveSecurityPolicy( v_lambda, c_lambda )
self._old_policy = SecurityManager.setSecurityPolicy( scp )
newSecurityManager( None
, _AllowedUser( a_lambda ).__of__( self.root ) )
scp = _SensitiveSecurityPolicy(v_lambda, c_lambda)
self._old_policy = SecurityManager.setSecurityPolicy(scp)
def test_copy_baseline( self ):
newSecurityManager(None, _AllowedUser(a_lambda).__of__(self.root))
def test_copy_baseline(self):
folder1, folder2 = self._initFolders()
folder2.all_meta_types = FILE_META_TYPES
self._initPolicyAndUser()
self.assertTrue( 'file' in folder1.objectIds() )
self.assertFalse( 'file' in folder2.objectIds() )
cookie = folder1.manage_copyObjects( ids=( 'file', ) )
folder2.manage_pasteObjects( cookie )
self.assertTrue('file' in folder1.objectIds())
self.assertFalse('file' in folder2.objectIds())
self.assertTrue( 'file' in folder1.objectIds() )
self.assertTrue( 'file' in folder2.objectIds() )
cookie = folder1.manage_copyObjects(ids=('file', ))
folder2.manage_pasteObjects(cookie)
def test_copy_cant_read_source( self ):
self.assertTrue('file' in folder1.objectIds())
self.assertTrue('file' in folder2.objectIds())
def test_copy_cant_read_source(self):
folder1, folder2 = self._initFolders()
folder2.all_meta_types = FILE_META_TYPES
a_file = folder1._getOb( 'file' )
def _validate( a, c, n, v, *args, **kw ):
return aq_base( v ) is not aq_base( a_file )
self._initPolicyAndUser( v_lambda=_validate )
a_file = folder1._getOb('file')
cookie = folder1.manage_copyObjects( ids=( 'file', ) )
self._assertCopyErrorUnauth( folder2.manage_pasteObjects
, cookie
, ce_regex='Insufficient privileges'
)
def _validate(a, c, n, v, *args, **kw):
return aq_base(v) is not aq_base(a_file)
def test_copy_cant_create_target_metatype_not_supported( self ):
self._initPolicyAndUser(v_lambda=_validate)
from OFS.CopySupport import CopyError
cookie = folder1.manage_copyObjects(ids=('file', ))
self._assertCopyErrorUnauth(
folder2.manage_pasteObjects, cookie,
ce_regex='Insufficient privileges')
def test_copy_cant_create_target_metatype_not_supported(self):
folder1, folder2 = self._initFolders()
folder2.all_meta_types = ()
self._initPolicyAndUser()
cookie = folder1.manage_copyObjects( ids=( 'file', ) )
self._assertCopyErrorUnauth( folder2.manage_pasteObjects
, cookie
, ce_regex='Not Supported'
)
def test_move_baseline( self ):
cookie = folder1.manage_copyObjects(ids=('file', ))
self._assertCopyErrorUnauth(
folder2.manage_pasteObjects, cookie, ce_regex='Not Supported')
def test_move_baseline(self):
folder1, folder2 = self._initFolders()
folder2.all_meta_types = FILE_META_TYPES
self.assertTrue( 'file' in folder1.objectIds() )
self.assertFalse( 'file' in folder2.objectIds() )
self.assertTrue('file' in folder1.objectIds())
self.assertFalse('file' in folder2.objectIds())
self._initPolicyAndUser()
cookie = folder1.manage_cutObjects( ids=( 'file', ) )
folder2.manage_pasteObjects( cookie )
self.assertFalse( 'file' in folder1.objectIds() )
self.assertTrue( 'file' in folder2.objectIds() )
cookie = folder1.manage_cutObjects(ids=('file', ))
folder2.manage_pasteObjects(cookie)
def test_move_cant_read_source( self ):
from OFS.CopySupport import CopyError
self.assertFalse('file' in folder1.objectIds())
self.assertTrue('file' in folder2.objectIds())
def test_move_cant_read_source(self):
folder1, folder2 = self._initFolders()
folder2.all_meta_types = FILE_META_TYPES
a_file = folder1._getOb( 'file' )
a_file = folder1._getOb('file')
def _validate( a, c, n, v, *args, **kw ):
return aq_base( v ) is not aq_base( a_file )
def _validate(a, c, n, v, *args, **kw):
return aq_base(v) is not aq_base(a_file)
self._initPolicyAndUser( v_lambda=_validate )
self._initPolicyAndUser(v_lambda=_validate)
cookie = folder1.manage_cutObjects( ids=( 'file', ) )
self._assertCopyErrorUnauth( folder2.manage_pasteObjects
, cookie
, ce_regex='Insufficient privileges'
)
def test_move_cant_create_target_metatype_not_supported( self ):
from OFS.CopySupport import CopyError
cookie = folder1.manage_cutObjects(ids=('file', ))
self._assertCopyErrorUnauth(
folder2.manage_pasteObjects, cookie,
ce_regex='Insufficient privileges')
def test_move_cant_create_target_metatype_not_supported(self):
folder1, folder2 = self._initFolders()
folder2.all_meta_types = ()
self._initPolicyAndUser()
cookie = folder1.manage_cutObjects( ids=( 'file', ) )
self._assertCopyErrorUnauth( folder2.manage_pasteObjects
, cookie
, ce_regex='Not Supported'
)
def test_move_cant_create_target_metatype_not_allowed( self ):
from OFS.CopySupport import CopyError
cookie = folder1.manage_cutObjects(ids=('file', ))
self._assertCopyErrorUnauth(
folder2.manage_pasteObjects, cookie, ce_regex='Not Supported')
def test_move_cant_create_target_metatype_not_allowed(self):
folder1, folder2 = self._initFolders()
folder2.all_meta_types = FILE_META_TYPES
def _no_add_images_and_files(permission, object, context):
return permission != ADD_IMAGES_AND_FILES
self._initPolicyAndUser( c_lambda=_no_add_images_and_files )
cookie = folder1.manage_cutObjects( ids=( 'file', ) )
self._assertCopyErrorUnauth( folder2.manage_pasteObjects
, cookie
, ce_regex='Insufficient Privileges'
+ '.*%s' % ADD_IMAGES_AND_FILES
)
self._initPolicyAndUser(c_lambda=_no_add_images_and_files)
def test_move_cant_delete_source( self ):
cookie = folder1.manage_cutObjects(ids=('file', ))
self._assertCopyErrorUnauth(
folder2.manage_pasteObjects, cookie,
ce_regex='Insufficient Privileges.*%s' % ADD_IMAGES_AND_FILES)
from OFS.CopySupport import CopyError
from AccessControl.Permissions import delete_objects as DeleteObjects
def test_move_cant_delete_source(self):
from AccessControl.Permissions import delete_objects
folder1, folder2 = self._initFolders()
folder1.manage_permission( DeleteObjects, roles=(), acquire=0 )
folder1.manage_permission(delete_objects, roles=(), acquire=0)
folder2.all_meta_types = FILE_META_TYPES
def _no_delete_objects(permission, object, context):
return permission != DeleteObjects
self._initPolicyAndUser( c_lambda=_no_delete_objects )
cookie = folder1.manage_cutObjects( ids=( 'file', ) )
self._assertCopyErrorUnauth( folder2.manage_pasteObjects
, cookie
, ce_regex='Insufficient Privileges'
+ '.*%s' % DeleteObjects
)
return permission != delete_objects
self._initPolicyAndUser(c_lambda=_no_delete_objects)
def test_suite():
suite = unittest.TestSuite()
suite.addTest( unittest.makeSuite( TestCopySupport ) )
suite.addTest( unittest.makeSuite( TestCopySupportSecurity ) )
return suite
cookie = folder1.manage_cutObjects(ids=('file', ))
self._assertCopyErrorUnauth(
folder2.manage_pasteObjects, cookie,
ce_regex='Insufficient Privileges.*%s' % delete_objects)
......@@ -9,9 +9,3 @@ class TestFTPInterface(unittest.TestCase):
from zope.interface.verify import verifyClass
verifyClass(IFTPAccess, FTPInterface)
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TestFTPInterface),
))
......@@ -42,12 +42,12 @@ class HistoryTests(unittest.TestCase):
t.description = None
t.note('Change 1')
t.commit()
time.sleep(0.02) # wait at least one Windows clock tick
time.sleep(0.02) # wait at least one Windows clock tick
hi.title = 'Second title'
t = transaction.get()
t.note('Change 2')
t.commit()
time.sleep(0.02) # wait at least one Windows clock tick
time.sleep(0.02) # wait at least one Windows clock tick
hi.title = 'Third title'
t = transaction.get()
t.note('Change 3')
......@@ -65,13 +65,13 @@ class HistoryTests(unittest.TestCase):
def test_manage_change_history(self):
r = self.hi.manage_change_history()
self.assertEqual(len(r),3) # three transactions
self.assertEqual(len(r), 3) # three transactions
for i in range(3):
entry = r[i]
# check no new keys show up without testing
self.assertEqual(len(entry.keys()),6)
self.assertEqual(len(entry.keys()), 6)
# the transactions are in newest-first order
self.assertEqual(entry['description'],'Change %i' % (3-i))
self.assertEqual(entry['description'], 'Change %i' % (3 - i))
self.assertTrue('key' in entry)
# lets not assume the size will stay the same forever
self.assertTrue('size' in entry)
......@@ -79,24 +79,18 @@ class HistoryTests(unittest.TestCase):
self.assertTrue('time' in entry)
if i:
# check times are increasing
self.assertTrue(entry['time']<r[i-1]['time'])
self.assertEqual(entry['user_name'],'')
self.assertTrue(entry['time'] < r[i - 1]['time'])
self.assertEqual(entry['user_name'], '')
def test_manage_historyCopy(self):
# we assume this works 'cos it's tested above
r = self.hi.manage_change_history()
# now we do the copy
self.hi.manage_historyCopy(
keys=[r[2]['key']]
)
self.hi.manage_historyCopy(keys=[r[2]['key']])
# do a commit, just like ZPublisher would
transaction.commit()
# check the body is as it should be, we assume (hopefully not foolishly)
# check the body is as it should be, we assume
# (hopefully not foolishly)
# that all other attributes will behave the same
self.assertEqual(self.hi.title,
'First title')
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(HistoryTests))
return suite
......@@ -5,7 +5,7 @@ from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManager import setSecurityPolicy
from AccessControl.SpecialUsers import emergency_user, nobody, system
from AccessControl.User import User # before SpecialUsers
from AccessControl.User import User # before SpecialUsers
from Acquisition import Implicit
from App.config import getConfiguration
from logging import getLogger
......@@ -22,28 +22,29 @@ from OFS.SimpleItem import SimpleItem
logger = getLogger('OFS.subscribers')
class FauxRoot( Implicit ):
class FauxRoot(Implicit):
id = '/'
def getPhysicalRoot( self ):
def getPhysicalRoot(self):
return self
def getPhysicalPath( self ):
def getPhysicalPath(self):
return ()
class FauxUser( Implicit ):
class FauxUser(Implicit):
def __init__( self, id, login ):
def __init__(self, id, login):
self._id = id
self._login = login
def getId( self ):
def getId(self):
return self._id
class DeleteFailed(Exception):
pass
......@@ -83,7 +84,7 @@ class ObjectManagerTests(PlacelessSetup, unittest.TestCase):
zcml.load_config('configure.zcml', OFS)
setDeprecatedManageAddDelete(ItemForDeletion)
def tearDown( self ):
def tearDown(self):
noSecurityManager()
getConfiguration().debug_mode = self.saved_cfg_debug_mode
super(ObjectManagerTests, self).tearDown()
......@@ -91,11 +92,11 @@ class ObjectManagerTests(PlacelessSetup, unittest.TestCase):
def setDebugMode(self, mode):
getConfiguration().debug_mode = mode
def _getTargetClass( self ):
def _getTargetClass(self):
return ObjectManagerWithIItem
def _makeOne( self, *args, **kw ):
return self._getTargetClass()( *args, **kw ).__of__( FauxRoot() )
def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw).__of__(FauxRoot())
def test_interfaces(self):
from OFS.interfaces import IObjectManager
......@@ -126,104 +127,105 @@ class ObjectManagerTests(PlacelessSetup, unittest.TestCase):
noSecurityManager()
setSecurityPolicy(oldPolicy)
def test_setObject_set_owner_with_no_user( self ):
def test_setObject_set_owner_with_no_user(self):
om = self._makeOne()
newSecurityManager( None, None )
si = SimpleItem( 'no_user' )
om._setObject( 'no_user', si )
self.assertEqual( si.__ac_local_roles__, None )
newSecurityManager(None, None)
si = SimpleItem('no_user')
om._setObject('no_user', si)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_set_owner_with_emergency_user( self ):
def test_setObject_set_owner_with_emergency_user(self):
om = self._makeOne()
newSecurityManager( None, emergency_user )
si = SimpleItem( 'should_fail' )
self.assertEqual( si.__ac_local_roles__, None )
self.assertRaises( EmergencyUserCannotOwn
, om._setObject, 'should_fail', si )
newSecurityManager(None, emergency_user)
si = SimpleItem('should_fail')
self.assertEqual(si.__ac_local_roles__, None)
self.assertRaises(
EmergencyUserCannotOwn,
om._setObject, 'should_fail', si)
def test_setObject_set_owner_with_system_user( self ):
def test_setObject_set_owner_with_system_user(self):
om = self._makeOne()
newSecurityManager( None, system )
si = SimpleItem( 'system' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'system', si )
self.assertEqual( si.__ac_local_roles__, None )
newSecurityManager(None, system)
si = SimpleItem('system')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('system', si)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_set_owner_with_anonymous_user( self ):
def test_setObject_set_owner_with_anonymous_user(self):
om = self._makeOne()
newSecurityManager( None, nobody )
si = SimpleItem( 'anon' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'anon', si )
self.assertEqual( si.__ac_local_roles__, None )
newSecurityManager(None, nobody)
si = SimpleItem('anon')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('anon', si)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_set_owner_with_user( self ):
def test_setObject_set_owner_with_user(self):
om = self._makeOne()
user = User( 'user', '123', (), () ).__of__( FauxRoot() )
newSecurityManager( None, user )
si = SimpleItem( 'user_creation' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'user_creation', si )
self.assertEqual( si.__ac_local_roles__, { 'user': ['Owner'] } )
user = User('user', '123', (), ()).__of__(FauxRoot())
newSecurityManager(None, user)
si = SimpleItem('user_creation')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('user_creation', si)
self.assertEqual(si.__ac_local_roles__, {'user': ['Owner']})
def test_setObject_set_owner_with_faux_user( self ):
def test_setObject_set_owner_with_faux_user(self):
om = self._makeOne()
user = FauxUser( 'user_id', 'user_login' ).__of__( FauxRoot() )
newSecurityManager( None, user )
si = SimpleItem( 'faux_creation' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'faux_creation', si )
self.assertEqual( si.__ac_local_roles__, { 'user_id': ['Owner'] } )
user = FauxUser('user_id', 'user_login').__of__(FauxRoot())
newSecurityManager(None, user)
si = SimpleItem('faux_creation')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('faux_creation', si)
self.assertEqual(si.__ac_local_roles__, {'user_id': ['Owner']})
def test_setObject_no_set_owner_with_no_user( self ):
def test_setObject_no_set_owner_with_no_user(self):
om = self._makeOne()
newSecurityManager( None, None )
si = SimpleItem( 'should_be_okay' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'should_be_okay', si, set_owner=0 )
self.assertEqual( si.__ac_local_roles__, None )
newSecurityManager(None, None)
si = SimpleItem('should_be_okay')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('should_be_okay', si, set_owner=0)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_no_set_owner_with_emergency_user( self ):
def test_setObject_no_set_owner_with_emergency_user(self):
om = self._makeOne()
newSecurityManager( None, emergency_user )
si = SimpleItem( 'should_be_okay' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'should_be_okay', si, set_owner=0 )
self.assertEqual( si.__ac_local_roles__, None )
newSecurityManager(None, emergency_user)
si = SimpleItem('should_be_okay')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('should_be_okay', si, set_owner=0)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_no_set_owner_with_system_user( self ):
def test_setObject_no_set_owner_with_system_user(self):
om = self._makeOne()
newSecurityManager( None, system )
si = SimpleItem( 'should_be_okay' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'should_be_okay', si, set_owner=0 )
self.assertEqual( si.__ac_local_roles__, None )
newSecurityManager(None, system)
si = SimpleItem('should_be_okay')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('should_be_okay', si, set_owner=0)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_no_set_owner_with_anonymous_user( self ):
def test_setObject_no_set_owner_with_anonymous_user(self):
om = self._makeOne()
newSecurityManager( None, nobody )
si = SimpleItem( 'should_be_okay' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'should_be_okay', si, set_owner=0 )
self.assertEqual( si.__ac_local_roles__, None )
newSecurityManager(None, nobody)
si = SimpleItem('should_be_okay')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('should_be_okay', si, set_owner=0)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_no_set_owner_with_user( self ):
def test_setObject_no_set_owner_with_user(self):
om = self._makeOne()
user = User( 'user', '123', (), () ).__of__( FauxRoot() )
newSecurityManager( None, user )
si = SimpleItem( 'should_be_okay' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'should_be_okay', si, set_owner=0 )
self.assertEqual( si.__ac_local_roles__, None )
user = User('user', '123', (), ()).__of__(FauxRoot())
newSecurityManager(None, user)
si = SimpleItem('should_be_okay')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('should_be_okay', si, set_owner=0)
self.assertEqual(si.__ac_local_roles__, None)
def test_setObject_no_set_owner_with_faux_user( self ):
def test_setObject_no_set_owner_with_faux_user(self):
om = self._makeOne()
user = FauxUser( 'user_id', 'user_login' ).__of__( FauxRoot() )
newSecurityManager( None, user )
si = SimpleItem( 'should_be_okay' )
self.assertEqual( si.__ac_local_roles__, None )
om._setObject( 'should_be_okay', si, set_owner=0 )
self.assertEqual( si.__ac_local_roles__, None )
user = FauxUser('user_id', 'user_login').__of__(FauxRoot())
newSecurityManager(None, user)
si = SimpleItem('should_be_okay')
self.assertEqual(si.__ac_local_roles__, None)
om._setObject('should_be_okay', si, set_owner=0)
self.assertEqual(si.__ac_local_roles__, None)
def test_delObject_before_delete(self):
# Test that manage_beforeDelete is called
......@@ -238,7 +240,7 @@ class ObjectManagerTests(PlacelessSetup, unittest.TestCase):
# Test exception behavior in manage_beforeDelete
# Manager user
self.setDebugMode(False)
newSecurityManager(None, system) # Manager
newSecurityManager(None, system) # Manager
om = self._makeOne()
ob = ItemForDeletion(fail_on_delete=True)
om._setObject(ob.getId(), ob)
......@@ -265,7 +267,7 @@ class ObjectManagerTests(PlacelessSetup, unittest.TestCase):
# Test exception behavior in manage_beforeDelete in debug mode
# Manager user
self.setDebugMode(True)
newSecurityManager(None, system) # Manager
newSecurityManager(None, system) # Manager
om = self._makeOne()
ob = ItemForDeletion(fail_on_delete=True)
om._setObject(ob.getId(), ob)
......@@ -494,9 +496,10 @@ class ObjectManagerTests(PlacelessSetup, unittest.TestCase):
self.assertTrue(filename.endswith('.zexp') or
filename.endswith('.xml'))
_marker = object()
class Test_checkValidId(unittest.TestCase):
class TestCheckValidId(unittest.TestCase):
def _callFUT(self, container, id, allow_dup=_marker):
from OFS.ObjectManager import checkValidId
......@@ -569,10 +572,3 @@ class Test_checkValidId(unittest.TestCase):
self.assertEqual(str(e),
'The id "abc__" is invalid because it ends with '
'two underscores.')
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(ObjectManagerTests),
unittest.makeSuite(Test_checkValidId),
))
......@@ -8,14 +8,19 @@ class DummyObject(CopySource):
def __init__(self, id, meta_type):
self.id = id
self.meta_type = meta_type
def cb_isMoveable(self):
return 1
def manage_afterAdd(self, item, container):
return
def manage_beforeDelete(self, item, container):
return
manage_afterAdd.__five_method__ = True
manage_beforeDelete.__five_method__ = True
def wl_isLocked(self):
return 0
......@@ -31,11 +36,12 @@ class TestOrderSupport(unittest.TestCase):
return
f = OrderedObjectManager()
f._objects = ( {'id':'o1', 'meta_type':'mt1'}
, {'id':'o2', 'meta_type':'mt2'}
, {'id':'o3', 'meta_type':'mt1'}
, {'id':'o4', 'meta_type':'mt2'}
)
f._objects = (
{'id': 'o1', 'meta_type': 'mt1'},
{'id': 'o2', 'meta_type': 'mt2'},
{'id': 'o3', 'meta_type': 'mt1'},
{'id': 'o4', 'meta_type': 'mt2'},
)
f.o1 = DummyObject('o1', 'mt1')
f.o2 = DummyObject('o2', 'mt2')
f.o3 = DummyObject('o3', 'mt1')
......@@ -54,119 +60,105 @@ class TestOrderSupport(unittest.TestCase):
f = self._makeOne()
method = getattr(f, methodname)
if rval == 'ValueError':
self.assertRaises( ValueError, method, *args )
self.assertRaises(ValueError, method, *args)
else:
self.assertEqual( method(*args), rval )
self.assertEqual( f.objectIds(), order )
self.assertEqual(method(*args), rval)
self.assertEqual(f.objectIds(), order)
def test_moveObjectsUp(self):
self._doCanonTest( 'moveObjectsUp',
( ( ( 'o4', 1 ), ['o1', 'o2', 'o4', 'o3'], 1 )
, ( ( 'o4', 2 ), ['o1', 'o4', 'o2', 'o3'], 1 )
, ( ( ('o1', 'o3'), 1 ), ['o1', 'o3', 'o2', 'o4'], 1 )
, ( ( ('o1', 'o3'), 9 ), ['o1', 'o3', 'o2', 'o4'], 1 )
, ( ( ('o2', 'o3'), 1 ), ['o2', 'o3', 'o1', 'o4'], 2 )
, ( ( ('o2', 'o3'), 1, ('o1', 'o2', 'o3', 'o4') ),
['o2', 'o3', 'o1', 'o4'], 2 )
, ( ( ('o2', 'o3'), 1, ('o2', 'o3', 'o4') ),
['o1', 'o2', 'o3', 'o4'], 0 )
, ( ( ('n2', 'o3'), 1 ), ['o1', 'o2', 'o3', 'o4'], 'ValueError')
, ( ( ('o3', 'o1'), 1 ), ['o1', 'o3', 'o2', 'o4'], 1 )
)
)
self._doCanonTest(
'moveObjectsUp',
((('o4', 1), ['o1', 'o2', 'o4', 'o3'], 1),
(('o4', 2), ['o1', 'o4', 'o2', 'o3'], 1),
((('o1', 'o3'), 1), ['o1', 'o3', 'o2', 'o4'], 1),
((('o1', 'o3'), 9), ['o1', 'o3', 'o2', 'o4'], 1),
((('o2', 'o3'), 1), ['o2', 'o3', 'o1', 'o4'], 2),
((('o2', 'o3'), 1, ('o1', 'o2', 'o3', 'o4')),
['o2', 'o3', 'o1', 'o4'], 2),
((('o2', 'o3'), 1, ('o2', 'o3', 'o4')),
['o1', 'o2', 'o3', 'o4'], 0),
((('n2', 'o3'), 1), ['o1', 'o2', 'o3', 'o4'], 'ValueError'),
((('o3', 'o1'), 1), ['o1', 'o3', 'o2', 'o4'], 1)))
def test_moveObjectsDown(self):
self._doCanonTest( 'moveObjectsDown',
( ( ( 'o1', 1 ), ['o2', 'o1', 'o3', 'o4'], 1 )
, ( ( 'o1', 2 ), ['o2', 'o3', 'o1', 'o4'], 1 )
, ( ( ('o2', 'o4'), 1 ), ['o1', 'o3', 'o2', 'o4'], 1 )
, ( ( ('o2', 'o4'), 9 ), ['o1', 'o3', 'o2', 'o4'], 1 )
, ( ( ('o2', 'o3'), 1 ), ['o1', 'o4', 'o2', 'o3'], 2 )
, ( ( ('o2', 'o3'), 1, ('o1', 'o2', 'o3', 'o4') ),
['o1', 'o4', 'o2', 'o3'], 2 )
, ( ( ('o2', 'o3'), 1, ('o1', 'o2', 'o3') ),
['o1', 'o2', 'o3', 'o4'], 0 )
, ( ( ('n2', 'o3'), 1 ), ['o1', 'o2', 'o3', 'o4'], 'ValueError')
, ( ( ('o4', 'o2'), 1 ), ['o1', 'o3', 'o2', 'o4'], 1 )
)
)
self._doCanonTest(
'moveObjectsDown',
((('o1', 1), ['o2', 'o1', 'o3', 'o4'], 1),
(('o1', 2), ['o2', 'o3', 'o1', 'o4'], 1),
((('o2', 'o4'), 1), ['o1', 'o3', 'o2', 'o4'], 1),
((('o2', 'o4'), 9), ['o1', 'o3', 'o2', 'o4'], 1),
((('o2', 'o3'), 1), ['o1', 'o4', 'o2', 'o3'], 2),
((('o2', 'o3'), 1, ('o1', 'o2', 'o3', 'o4')),
['o1', 'o4', 'o2', 'o3'], 2),
((('o2', 'o3'), 1, ('o1', 'o2', 'o3')),
['o1', 'o2', 'o3', 'o4'], 0),
((('n2', 'o3'), 1), ['o1', 'o2', 'o3', 'o4'], 'ValueError'),
((('o4', 'o2'), 1), ['o1', 'o3', 'o2', 'o4'], 1)))
def test_moveObjectsToTop(self):
self._doCanonTest( 'moveObjectsToTop',
( ( ( 'o4', ), ['o4', 'o1', 'o2', 'o3'], 1 )
, ( ( ('o1', 'o3'), ), ['o1', 'o3', 'o2', 'o4'], 1 )
, ( ( ('o2', 'o3'), ), ['o2', 'o3', 'o1', 'o4'], 2 )
, ( ( ('o2', 'o3'), ('o1', 'o2', 'o3', 'o4') ),
['o2', 'o3', 'o1', 'o4'], 2 )
, ( ( ('o2', 'o3'), ('o2', 'o3', 'o4') ),
['o1', 'o2', 'o3', 'o4'], 0 )
, ( ( ('n2', 'o3'), ), ['o1', 'o2', 'o3', 'o4'], 'ValueError')
, ( ( ('o3', 'o1'), ), ['o3', 'o1', 'o2', 'o4'], 1 )
)
)
self._doCanonTest(
'moveObjectsToTop',
((('o4', ), ['o4', 'o1', 'o2', 'o3'], 1),
((('o1', 'o3'), ), ['o1', 'o3', 'o2', 'o4'], 1),
((('o2', 'o3'), ), ['o2', 'o3', 'o1', 'o4'], 2),
((('o2', 'o3'), ('o1', 'o2', 'o3', 'o4')),
['o2', 'o3', 'o1', 'o4'], 2),
((('o2', 'o3'), ('o2', 'o3', 'o4')),
['o1', 'o2', 'o3', 'o4'], 0),
((('n2', 'o3'), ), ['o1', 'o2', 'o3', 'o4'], 'ValueError'),
((('o3', 'o1'), ), ['o3', 'o1', 'o2', 'o4'], 1)))
def test_moveObjectsToBottom(self):
self._doCanonTest( 'moveObjectsToBottom',
( ( ( 'o1', ), ['o2', 'o3', 'o4', 'o1'], 1 )
, ( ( ('o2', 'o4'), ), ['o1', 'o3', 'o2', 'o4'], 1 )
, ( ( ('o2', 'o3'), ), ['o1', 'o4', 'o2', 'o3'], 2 )
, ( ( ('o2', 'o3'), ('o1', 'o2', 'o3', 'o4') ),
['o1', 'o4', 'o2', 'o3'], 2 )
, ( ( ('o2', 'o3'), ('o1', 'o2', 'o3') ),
['o1', 'o2', 'o3', 'o4'], 0 )
, ( ( ('n2', 'o3'), ), ['o1', 'o2', 'o3', 'o4'], 'ValueError')
, ( ( ('o4', 'o2'), ), ['o1', 'o3', 'o4', 'o2'], 1 )
)
)
self._doCanonTest(
'moveObjectsToBottom',
((('o1', ), ['o2', 'o3', 'o4', 'o1'], 1),
((('o2', 'o4'), ), ['o1', 'o3', 'o2', 'o4'], 1),
((('o2', 'o3'), ), ['o1', 'o4', 'o2', 'o3'], 2),
((('o2', 'o3'), ('o1', 'o2', 'o3', 'o4')),
['o1', 'o4', 'o2', 'o3'], 2),
((('o2', 'o3'), ('o1', 'o2', 'o3')),
['o1', 'o2', 'o3', 'o4'], 0),
((('n2', 'o3'), ), ['o1', 'o2', 'o3', 'o4'], 'ValueError'),
((('o4', 'o2'), ), ['o1', 'o3', 'o4', 'o2'], 1)))
def test_orderObjects(self):
self._doCanonTest( 'orderObjects',
( ( ( 'id', 'id' ), ['o4', 'o3', 'o2', 'o1'], 3)
, ( ( 'meta_type', '' ), ['o1', 'o3', 'o2', 'o4'], 1)
, ( ( 'meta_type', 'n' ), ['o4', 'o2', 'o3', 'o1'], 3)
, ( ( 'position', 0 ), ['o1', 'o2', 'o3', 'o4'], 0)
, ( ( 'position', 1 ), ['o4', 'o3', 'o2', 'o1'], 3)
)
)
self._doCanonTest(
'orderObjects',
((('id', 'id'), ['o4', 'o3', 'o2', 'o1'], 3),
(('meta_type', ''), ['o1', 'o3', 'o2', 'o4'], 1),
(('meta_type', 'n'), ['o4', 'o2', 'o3', 'o1'], 3),
(('position', 0), ['o1', 'o2', 'o3', 'o4'], 0),
(('position', 1), ['o4', 'o3', 'o2', 'o1'], 3)))
def test_getObjectPosition(self):
self._doCanonTest( 'getObjectPosition',
( ( ( 'o2', ), ['o1', 'o2', 'o3', 'o4'], 1)
, ( ( 'o4', ), ['o1', 'o2', 'o3', 'o4'], 3)
, ( ( 'n2', ), ['o1', 'o2', 'o3', 'o4'], 'ValueError')
)
)
self._doCanonTest(
'getObjectPosition',
((('o2', ), ['o1', 'o2', 'o3', 'o4'], 1),
(('o4', ), ['o1', 'o2', 'o3', 'o4'], 3),
(('n2', ), ['o1', 'o2', 'o3', 'o4'], 'ValueError')))
def test_moveObjectToPosition(self):
self._doCanonTest( 'moveObjectToPosition',
( ( ( 'o2', 2 ), ['o1', 'o3', 'o2', 'o4'], 1)
, ( ( 'o4', 2 ), ['o1', 'o2', 'o4', 'o3'], 1)
, ( ( 'n2', 2 ), ['o1', 'o2', 'o3', 'o4'], 'ValueError')
)
)
self._doCanonTest(
'moveObjectToPosition',
((('o2', 2), ['o1', 'o3', 'o2', 'o4'], 1),
(('o4', 2), ['o1', 'o2', 'o4', 'o3'], 1),
(('n2', 2), ['o1', 'o2', 'o3', 'o4'], 'ValueError')))
def test_manage_renameObject(self):
self._doCanonTest( 'manage_renameObject',
( ( ( 'o2', 'n2' ), ['o1', 'n2', 'o3', 'o4'], None )
, ( ( 'o3', 'n3' ), ['o1', 'o2', 'n3', 'o4'], None )
)
)
self._doCanonTest(
'manage_renameObject',
((('o2', 'n2'), ['o1', 'n2', 'o3', 'o4'], None),
(('o3', 'n3'), ['o1', 'o2', 'n3', 'o4'], None)))
def test_tpValues(self):
f = self._makeOne()
f.o2.isPrincipiaFolderish = True
f.o3.isPrincipiaFolderish = True
f.o4.isPrincipiaFolderish = True
self.assertEqual( f.tpValues(), [f.o2, f.o3, f.o4] )
self.assertEqual(f.tpValues(), [f.o2, f.o3, f.o4])
f.setDefaultSorting('meta_type', False)
self.assertEqual( f.tpValues(), [f.o3, f.o2, f.o4] )
self.assertEqual(f.tpValues(), [f.o3, f.o2, f.o4])
f.setDefaultSorting('position', True)
self.assertEqual( f.tpValues(), [f.o4, f.o3, f.o2] )
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TestOrderSupport),
))
self.assertEqual(f.tpValues(), [f.o4, f.o3, f.o2])
......@@ -13,12 +13,14 @@
##############################################################################
import unittest
def makeConnection():
import ZODB
from ZODB.DemoStorage import DemoStorage
s = DemoStorage()
return ZODB.DB( s ).open()
return ZODB.DB(s).open()
def createBigFile():
# Create a file that is several 1<<16 blocks of data big, to force the
......@@ -27,11 +29,10 @@ def createBigFile():
import cStringIO
import random
import string
size = (1<<16) * 5 + 12345
size = (1 << 16) * 5 + 12345
file = cStringIO.StringIO()
def addLetter(x, add=file.write, l=string.letters,
c=random.choice):
def addLetter(x, add=file.write, l=string.letters, c=random.choice):
add(c(l))
filter(addLetter, range(size))
......@@ -40,6 +41,7 @@ def createBigFile():
TESTFOLDER_NAME = 'RangesTestSuite_testFolder'
BIGFILE = createBigFile()
class TestRequestRange(unittest.TestCase):
# Test case setup and teardown
def setUp(self):
......@@ -58,14 +60,16 @@ class TestRequestRange(unittest.TestCase):
r['Application'] = a
self.root = a
self.app = makerequest(self.root, stdout=self.responseOut)
try: self.app._delObject(TESTFOLDER_NAME)
except AttributeError: pass
try:
self.app._delObject(TESTFOLDER_NAME)
except AttributeError:
pass
manage_addFolder(self.app, TESTFOLDER_NAME)
folder = getattr( self.app, TESTFOLDER_NAME )
folder = getattr(self.app, TESTFOLDER_NAME)
data = string.letters
manage_addFile( folder, 'file'
, file=data, content_type='text/plain')
manage_addFile(
folder, 'file', file=data, content_type='text/plain')
self.file = folder.file
self.data = data
......@@ -121,17 +125,13 @@ class TestRequestRange(unittest.TestCase):
body = self.doGET(req, rsp)
self.assertTrue(rsp.getStatus() == 416,
'Expected a 416 status, got %s' % rsp.getStatus())
self.assertTrue(rsp.getStatus() == 416)
expect_content_range = 'bytes */%d' % len(self.data)
content_range = rsp.getHeader('content-range')
self.assertFalse(content_range is None, 'No Content-Range header was set!')
self.assertTrue(content_range == expect_content_range,
'Received incorrect Content-Range header. Expected %s, got %s' % (
`expect_content_range`, `content_range`))
self.assertTrue(body == '', 'index_html returned %s' % `body`)
self.assertFalse(content_range is None)
self.assertEqual(content_range, expect_content_range)
self.assertEqual(body, '')
def expectOK(self, rangeHeader, if_range=None):
req = self.app.REQUEST
......@@ -142,10 +142,8 @@ class TestRequestRange(unittest.TestCase):
if if_range is not None:
req.environ['HTTP_IF_RANGE'] = if_range
body = self.doGET(req, rsp)
self.assertTrue(rsp.getStatus() == 200,
'Expected a 200 status, got %s' % rsp.getStatus())
self.doGET(req, rsp)
self.assertEqual(rsp.getStatus(), 200)
def expectSingleRange(self, range, start, end, if_range=None):
req = self.app.REQUEST
......@@ -157,24 +155,15 @@ class TestRequestRange(unittest.TestCase):
req.environ['HTTP_IF_RANGE'] = if_range
body = self.doGET(req, rsp)
self.assertTrue(rsp.getStatus() == 206,
'Expected a 206 status, got %s' % rsp.getStatus())
self.assertEqual(rsp.getStatus(), 206)
expect_content_range = 'bytes %d-%d/%d' % (
start, end - 1, len(self.data))
content_range = rsp.getHeader('content-range')
self.assertFalse(content_range is None, 'No Content-Range header was set!')
self.assertTrue(content_range == expect_content_range,
'Received incorrect Content-Range header. Expected %s, got %s' % (
`expect_content_range`, `content_range`))
self.assertFalse(rsp.getHeader('content-length') != str(len(body)),
'Incorrect Content-Length is set! Expected %s, got %s.' % (
str(len(body)), rsp.getHeader('content-length')))
self.assertTrue(body == self.data[start:end],
'Incorrect range returned, expected %s, got %s' % (
`self.data[start:end]`, `body`))
self.assertFalse(content_range is None)
self.assertEqual(content_range, expect_content_range)
self.assertEqual(rsp.getHeader('content-length'), str(len(body)))
self.assertEqual(body, self.data[start:end])
def expectMultipleRanges(self, range, sets, draft=0):
import cStringIO
......@@ -192,20 +181,14 @@ class TestRequestRange(unittest.TestCase):
body = self.doGET(req, rsp)
self.assertTrue(rsp.getStatus() == 206,
'Expected a 206 status, got %s' % rsp.getStatus())
self.assertFalse(rsp.getHeader('content-range'),
'The Content-Range header should not be set!')
self.assertTrue(rsp.getStatus() == 206)
self.assertFalse(rsp.getHeader('content-range'))
ct = rsp.getHeader('content-type').split(';')[0]
draftprefix = draft and 'x-' or ''
self.assertFalse(ct != 'multipart/%sbyteranges' % draftprefix,
"Incorrect Content-Type set. Expected 'multipart/%sbyteranges', "
"got %s" % (draftprefix, ct))
self.assertFalse(ct != 'multipart/%sbyteranges' % draftprefix)
if rsp.getHeader('content-length'):
self.assertFalse(rsp.getHeader('content-length') != str(len(body)),
'Incorrect Content-Length is set! Expected %s, got %s.' % (
str(len(body)), rsp.getHeader('content-length')))
self.assertFalse(rsp.getHeader('content-length') != str(len(body)))
# Decode the multipart message
bodyfile = cStringIO.StringIO('Content-Type: %s\n\n%s' % (
......@@ -224,25 +207,16 @@ class TestRequestRange(unittest.TestCase):
start, end, size = int(start), int(end), int(size)
end = end + 1
self.assertFalse(size != len(self.data),
'Part Content-Range header reported incorrect length. '
'Expected %d, got %d.' % (len(self.data), size))
self.assertFalse(size != len(self.data))
body = part.get_payload()
self.assertFalse(len(body) != end - start,
'Part (%d, %d) is of wrong length, expected %d, got %d.' % (
start, end, end - start, len(body)))
self.assertFalse(body != self.data[start:end],
'Part (%d, %d) has incorrect data. Expected %s, got %s.' % (
start, end, `self.data[start:end]`, `body`))
self.assertFalse(len(body) != end - start)
self.assertFalse(body != self.data[start:end])
add((start, end))
# Copmare the ranges used with the expected range sets.
self.assertFalse(returnedRanges != sets,
'Got unexpected sets, expected %s, got %s' % (
sets, returnedRanges))
# Compare the ranges used with the expected range sets.
self.assertFalse(returnedRanges != sets)
# Unsatisfiable requests
def testNegativeZero(self):
......@@ -286,7 +260,7 @@ class TestRequestRange(unittest.TestCase):
# Files of size 1<<16 are stored in linked Pdata objects. They are
# treated seperately in the range code.
self.uploadBigFile()
join = 3 * (1<<16) # A join between two linked objects
join = 3 * (1 << 16) # A join between two linked objects
start = join - 1000
end = join + 1000
range = '%d-%d' % (start, end - 1)
......@@ -311,12 +285,14 @@ class TestRequestRange(unittest.TestCase):
def testMultipleRangesBigFile(self):
self.uploadBigFile()
self.expectMultipleRanges('3-700,10-15,-10000',
self.expectMultipleRanges(
'3-700,10-15,-10000',
[(3, 701), (10, 16), (len(self.data) - 10000, len(self.data))])
def testMultipleRangesBigFileOutOfOrder(self):
self.uploadBigFile()
self.expectMultipleRanges('10-15,-10000,70000-80000',
self.expectMultipleRanges(
'10-15,-10000,70000-80000',
[(10, 16), (len(self.data) - 10000, len(self.data)),
(70000, 80001)])
......@@ -324,7 +300,8 @@ class TestRequestRange(unittest.TestCase):
self.uploadBigFile()
l = len(self.data)
start, end = l - 100, l + 100
self.expectMultipleRanges('3-700,%s-%s' % (start, end),
self.expectMultipleRanges(
'3-700,%s-%s' % (start, end),
[(3, 701), (len(self.data) - 100, len(self.data))])
# If-Range headers
......@@ -334,27 +311,26 @@ class TestRequestRange(unittest.TestCase):
self.expectSingleRange('10-25', 10, 26, if_range='garbage')
def testEqualIfRangeDate(self):
self.expectSingleRange('10-25', 10, 26,
self.expectSingleRange(
'10-25', 10, 26,
if_range=self.createLastModifiedDate())
def testIsModifiedIfRangeDate(self):
self.expectOK('21-25,10-20',
self.expectOK(
'21-25,10-20',
if_range=self.createLastModifiedDate(offset=-100))
def testIsNotModifiedIfRangeDate(self):
self.expectSingleRange('10-25', 10, 26,
self.expectSingleRange(
'10-25', 10, 26,
if_range=self.createLastModifiedDate(offset=100))
def testEqualIfRangeEtag(self):
self.expectSingleRange('10-25', 10, 26,
self.expectSingleRange(
'10-25', 10, 26,
if_range=self.file.http__etag())
def testNotEqualIfRangeEtag(self):
self.expectOK('10-25',
self.expectOK(
'10-25',
if_range=self.file.http__etag() + 'bar')
def test_suite():
suite = unittest.TestSuite()
suite.addTest( unittest.makeSuite( TestRequestRange ) )
return suite
......@@ -27,16 +27,17 @@ class TestItem(unittest.TestCase):
class RESPONSE(object):
handle_errors = True
item = self._makeOne()
def _raise_during_standard_error_message(*args, **kw):
raise ZeroDivisionError('testing')
item.standard_error_message = _raise_during_standard_error_message
try:
item.raise_standardErrorMessage(
error_type=OverflowError,
error_value='simple',
REQUEST=REQUEST(),
)
except:
error_type=OverflowError,
error_value='simple',
REQUEST=REQUEST(),
)
except Exception:
import sys
self.assertEqual(sys.exc_info()[0], OverflowError)
value = sys.exc_info()[1]
......@@ -45,20 +46,22 @@ class TestItem(unittest.TestCase):
def test_raise_StandardErrorMessage_TaintedString_errorValue(self):
from AccessControl.tainted import TaintedString
class REQUEST(object):
class RESPONSE(object):
handle_errors = True
item = self._makeOne()
def _raise_during_standard_error_message(*args, **kw):
raise ZeroDivisionError('testing')
item.standard_error_message = _raise_during_standard_error_message
try:
item.raise_standardErrorMessage(
error_type=OverflowError,
error_value=TaintedString('<simple>'),
REQUEST=REQUEST(),
)
except:
error_type=OverflowError,
error_value=TaintedString('<simple>'),
REQUEST=REQUEST(),
)
except Exception:
import sys
self.assertEqual(sys.exc_info()[0], OverflowError)
value = sys.exc_info()[1]
......@@ -113,10 +116,3 @@ class TestSimpleItem(unittest.TestCase):
REQUEST=REQUEST())
self.assertEquals(sem.kw.get('error_type'), 'BadRequest')
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TestItem),
unittest.makeSuite(TestItem_w__name__),
unittest.makeSuite(TestSimpleItem),
))
......@@ -20,35 +20,24 @@ class UnitTestSecurityPolicy:
"""
Stub out the existing security policy for unit testing purposes.
"""
#
# Standard SecurityPolicy interface
#
def validate( self
, accessed=None
, container=None
, name=None
, value=None
, context=None
, roles=None
, *args
, **kw):
def validate(self, accessed=None, container=None, name=None, value=None,
context=None, roles=None, *args, **kw):
return 1
def checkPermission( self, permission, object, context) :
def checkPermission(self, permission, object, context):
return 1
class CruelSecurityPolicy:
"""Denies everything
"""
#
# Standard SecurityPolicy interface
#
def validate(self, accessed, container, name, value, *args):
from AccessControl import Unauthorized
raise Unauthorized, name
raise Unauthorized(name)
def checkPermission( self, permission, object, context) :
def checkPermission(self, permission, object, context):
return 0
......@@ -65,15 +54,16 @@ class ProtectedMethodSecurityPolicy:
if name is None:
raise Unauthorized
klass = value.im_self.__class__
roles = getattr(klass, name+'__roles__', object())
if roles is None: # ACCESS_PUBLIC
roles = getattr(klass, name + '__roles__', object())
if roles is None: # ACCESS_PUBLIC
return 1
raise Unauthorized(name)
class TestTraverse( unittest.TestCase ):
def setUp( self ):
class TestTraverse(unittest.TestCase):
def setUp(self):
import cStringIO
import transaction
from AccessControl import SecurityManager
......@@ -94,37 +84,36 @@ class TestTraverse( unittest.TestCase ):
r['Application'] = a
self.root = a
responseOut = self.responseOut = cStringIO.StringIO()
self.app = makerequest( self.root, stdout=responseOut )
manage_addFolder( self.app, 'folder1' )
folder1 = getattr( self.app, 'folder1' )
self.app = makerequest(self.root, stdout=responseOut)
manage_addFolder(self.app, 'folder1')
folder1 = getattr(self.app, 'folder1')
setattr(folder1, '+something', 'plus')
folder1.all_meta_types = \
({ 'name' : 'File'
, 'action' : 'manage_addFile'
, 'permission' : 'Add images and files'
}
,
)
folder1.all_meta_types = (
{'name': 'File',
'action': 'manage_addFile',
'permission': 'Add images and files'
},
)
manage_addFile( folder1, 'file'
, file='', content_type='text/plain')
manage_addFile(folder1, 'file',
file='', content_type='text/plain')
# Hack, we need a _p_mtime for the file, so we make sure that it
# has one. We use a subtransaction, which means we can rollback
# later and pretend we didn't touch the ZODB.
transaction.commit()
except:
except Exception:
self.connection.close()
raise
transaction.begin()
self.folder1 = getattr( self.app, 'folder1' )
self.folder1 = getattr(self.app, 'folder1')
self.policy = UnitTestSecurityPolicy()
self.oldPolicy = SecurityManager.setSecurityPolicy( self.policy )
newSecurityManager( None, self._makeUser().__of__( self.root ) )
self.oldPolicy = SecurityManager.setSecurityPolicy(self.policy)
newSecurityManager(None, self._makeUser().__of__(self.root))
def tearDown( self ):
def tearDown(self):
import transaction
self._setupSecurity()
del self.oldPolicy
......@@ -140,14 +129,16 @@ class TestTraverse( unittest.TestCase ):
def _makeUser(self):
from Acquisition import Implicit
class UnitTestUser(Implicit):
"""
Stubbed out manager for unit testing purposes.
"""
def getId( self ):
def getId(self):
return 'unit_tester'
getUserName = getId
def allowed( self, object, object_roles=None ):
def allowed(self, object, object_roles=None):
return 1
return UnitTestUser()
......@@ -199,11 +190,11 @@ class TestTraverse( unittest.TestCase ):
class Restricted(SimpleItem):
"""Instance we'll check with ProtectedMethodSecurityPolicy
"""
getId__roles__ = None # ACCESS_PUBLIC
getId__roles__ = None # ACCESS_PUBLIC
def getId(self):
return self.id
private__roles__ = () # ACCESS_PRIVATE
private__roles__ = () # ACCESS_PRIVATE
def private(self):
return 'private!'
......@@ -228,33 +219,30 @@ class TestTraverse( unittest.TestCase ):
verifyClass(ITraversable, Traversable)
def testTraversePath( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertTrue(
self.folder1.unrestrictedTraverse( ('', 'folder1', 'file' ) ))
def testTraversePath(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue(
self.folder1.unrestrictedTraverse( ('', 'folder1' ) ))
self.folder1.unrestrictedTraverse(('', 'folder1', 'file')))
self.assertTrue(self.folder1.unrestrictedTraverse(('', 'folder1')))
def testTraverseURLNoSlash( self ):
self.assertTrue( 'file' in self.folder1.objectIds() )
self.assertTrue(
self.folder1.unrestrictedTraverse( '/folder1/file' ))
self.assertTrue(
self.folder1.unrestrictedTraverse( '/folder1' ))
def testTraverseURLNoSlash(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue(self.folder1.unrestrictedTraverse('/folder1/file'))
self.assertTrue(self.folder1.unrestrictedTraverse('/folder1'))
def testTraverseURLSlash( self ):
def testTraverseURLSlash(self):
self.assertTrue('file' in self.folder1.objectIds())
self.assertTrue(self.folder1.unrestrictedTraverse( '/folder1/file/'))
self.assertTrue(self.folder1.unrestrictedTraverse( '/folder1/'))
self.assertTrue(self.folder1.unrestrictedTraverse('/folder1/file/'))
self.assertTrue(self.folder1.unrestrictedTraverse('/folder1/'))
def testTraverseToNone( self ):
def testTraverseToNone(self):
self.assertRaises(
KeyError,
self.folder1.unrestrictedTraverse, ('', 'folder1', 'file2' ) )
self.folder1.unrestrictedTraverse, ('', 'folder1', 'file2'))
self.assertRaises(
KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2' )
KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2')
self.assertRaises(
KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2/' )
KeyError, self.folder1.unrestrictedTraverse, '/folder1/file2/')
def testTraverseMethodRestricted(self):
from AccessControl import Unauthorized
......@@ -324,7 +312,7 @@ class TestTraverse( unittest.TestCase ):
bb = self._makeBoboTraversableWithAcquisition()
bb = bb.__of__(self.root)
self.assertRaises(Unauthorized,
bb.restrictedTraverse, 'folder1')
bb.restrictedTraverse, 'folder1')
def testBoboTraverseToAcquiredAttribute(self):
# Verify it's possible to use __bobo_traverse__ to an acquired
......@@ -350,11 +338,12 @@ class TestTraverse( unittest.TestCase ):
bb = self._makeBoboTraversableWithAcquisition()
bb = bb.__of__(folder)
self.assertRaises(Unauthorized,
self.root.folder1.restrictedTraverse, 'stuff')
self.root.folder1.restrictedTraverse, 'stuff')
def testBoboTraverseTraversalDefault(self):
from OFS.SimpleItem import SimpleItem
from ZPublisher.interfaces import UseTraversalDefault
class BoboTraversableUseTraversalDefault(SimpleItem):
"""
A BoboTraversable class which may use "UseTraversalDefault"
......@@ -364,10 +353,10 @@ class TestTraverse( unittest.TestCase ):
default = 'Default'
def __bobo_traverse__(self, request, name):
if name == 'normal': return 'Normal'
if name == 'normal':
return 'Normal'
raise UseTraversalDefault
bb = BoboTraversableUseTraversalDefault()
# normal access -- no traversal default used
self.assertEqual(bb.unrestrictedTraverse('normal'), 'Normal')
......@@ -377,7 +366,8 @@ class TestTraverse( unittest.TestCase ):
si = SimpleItem()
si.default_acquire = 'Default_Acquire'
si.bb = bb
self.assertEqual(si.unrestrictedTraverse('bb/default_acquire'), 'Default_Acquire')
self.assertEqual(si.unrestrictedTraverse('bb/default_acquire'),
'Default_Acquire')
def testAcquiredAttributeDenial(self):
# Verify that restrictedTraverse raises the right kind of exception
......@@ -387,17 +377,17 @@ class TestTraverse( unittest.TestCase ):
from AccessControl import Unauthorized
from AccessControl.SecurityManagement import newSecurityManager
self._setupSecurity(CruelSecurityPolicy())
newSecurityManager( None, self._makeUser().__of__( self.root ) )
newSecurityManager(None, self._makeUser().__of__(self.root))
self.root.stuff = 'stuff here'
self.assertRaises(Unauthorized,
self.app.folder1.restrictedTraverse, 'stuff')
self.app.folder1.restrictedTraverse, 'stuff')
def testDefaultValueWhenUnathorized(self):
# Test that traversing to an unauthorized object returns
# the default when provided
from AccessControl.SecurityManagement import newSecurityManager
self._setupSecurity(CruelSecurityPolicy())
newSecurityManager( None, self._makeUser().__of__( self.root ) )
newSecurityManager(None, self._makeUser().__of__(self.root))
self.root.stuff = 'stuff here'
self.assertEqual(
self.root.folder1.restrictedTraverse('stuff', 42), 42)
......@@ -429,7 +419,7 @@ class TestTraverse( unittest.TestCase ):
self.assertTrue(
aq_base(self.root.folder1.file.restrictedTraverse('../..')) is
aq_base(self.root))
def testTraverseToNameStartingWithPlus(self):
# Verify it's possible to traverse to a name such as +something
self.assertTrue(
......@@ -450,14 +440,14 @@ def test_traversable():
>>> zcml.load_config("configure.zcml", Products.Five)
>>> from Testing.makerequest import makerequest
>>> self.app = makerequest(self.app)
``SimpleContent`` is a traversable class by default. Its fallback
traverser should raise NotFound when traversal fails. (Note: If
we return None in __fallback_traverse__, this test passes but for
the wrong reason: None doesn't have a docstring so BaseRequest
raises NotFoundError.)
>>> from Products.Five.tests.testing import simplecontent
>>> from Products.Five.tests.testing import simplecontent
>>> simplecontent.manage_addSimpleContent(self.folder, 'testoid',
... 'Testoid')
>>> from zExceptions import NotFound
......@@ -474,7 +464,7 @@ def test_traversable():
... xmlns:meta="http://namespaces.zope.org/meta"
... xmlns:browser="http://namespaces.zope.org/browser"
... xmlns:five="http://namespaces.zope.org/five">
...
...
... <!-- make the zope2.Public permission work -->
... <meta:redefinePermission from="zope2.Public" to="zope.Public" />
...
......@@ -527,7 +517,6 @@ def test_traversable():
>>> self.folder.fancy.unrestrictedTraverse('fancyview').index_html({})
'fancyview'
Note that during publishing, if the original __bobo_traverse__ method
*does* raise AttributeError or KeyError, we can get normal view look-up.
......@@ -566,14 +555,14 @@ def test_traversable():
... 'an_attribute').index_html({})
'an_attribute'
If we traverse to something via an adapter lookup and it provides IAcquirer,
it should get acquisition-wrapped so we can acquire attributes implicitly:
If we traverse to something via an adapter lookup and it provides
IAcquirer, it should get acquisition-wrapped so we can acquire
attributes implicitly:
>>> acquirer = self.folder.unrestrictedTraverse('acquirer')
>>> acquirer.fancy
<FancyContent ...>
Clean up:
>>> from zope.component.testing import tearDown
......@@ -596,6 +585,7 @@ def test_traversable():
False
"""
def test_view_doesnt_shadow_attribute():
"""
Test that views don't shadow attributes, e.g. items in a folder.
......
......@@ -91,23 +91,21 @@ class TestsOfBroken(unittest.TestCase):
"_p_mtime",
"_p_oid",
"_p_serial",
"_p_state",
]
"_p_state"]
PERSISTENCE_METHODS = ["_p_deactivate",
"_p_activate",
"_p_invalidate",
"_p_getattr",
"_p_setattr",
"_p_delattr",
]
"_p_delattr"]
inst = Broken(self, OID, ('Products.MyProduct.MyClass', 'MyClass'))
for attr_name in PERSISTENCE_ATTRS:
attr = getattr(inst, attr_name) # doesn't raise
getattr(inst, attr_name) # doesn't raise
for meth_name in PERSISTENCE_METHODS:
meth = getattr(inst, meth_name) # doesn't raise
getattr(inst, meth_name) # doesn't raise
class TestsIntegratedBroken(base.TestCase):
......@@ -146,7 +144,3 @@ def test_suite():
suite.addTest(unittest.makeSuite(TestsOfBroken))
suite.addTest(unittest.makeSuite(TestsIntegratedBroken))
return suite
def main():
unittest.main(defaultTest='test_suite')
......@@ -23,54 +23,70 @@ from OFS.OrderedFolder import OrderedFolder
from zope.component import testing, eventtesting
def setUp(test):
testing.setUp(test)
eventtesting.setUp(test)
class DontComplain(object):
def _verifyObjectPaste(self, object, validate_src=1):
pass
def cb_isMoveable(self):
return True
def cb_isCopyable(self):
return True
class NotifyBase(DontComplain):
def manage_afterAdd(self, item, container):
print 'old manage_afterAdd %s %s %s' % (self.getId(), item.getId(),
container.getId())
print('old manage_afterAdd %s %s %s' % (
self.getId(), item.getId(), container.getId()))
super(NotifyBase, self).manage_afterAdd(item, container)
manage_afterAdd.__five_method__ = True # Shut up deprecation warnings
manage_afterAdd.__five_method__ = True # Shut up deprecation warnings
def manage_beforeDelete(self, item, container):
super(NotifyBase, self).manage_beforeDelete(item, container)
print 'old manage_beforeDelete %s %s %s' % (self.getId(), item.getId(),
container.getId())
manage_beforeDelete.__five_method__ = True # Shut up deprecation warnings
print('old manage_beforeDelete %s %s %s' % (
self.getId(), item.getId(), container.getId()))
manage_beforeDelete.__five_method__ = True # Shut up deprecation warnings
def manage_afterClone(self, item):
print 'old manage_afterClone %s %s' % (self.getId(), item.getId())
print('old manage_afterClone %s %s' % (self.getId(), item.getId()))
super(NotifyBase, self).manage_afterClone(item)
manage_afterClone.__five_method__ = True # Shut up deprecation warnings
manage_afterClone.__five_method__ = True # Shut up deprecation warnings
class MyApp(Folder):
def getPhysicalRoot(self):
return self
class MyFolder(NotifyBase, Folder):
pass
class MyOrderedFolder(NotifyBase, OrderedFolder):
pass
class MyContent(NotifyBase, SimpleItem):
def __init__(self, id):
self._setId(id)
# These don't have manage_beforeDelete & co methods
class MyNewContent(DontComplain, SimpleItem):
def __init__(self, id):
self._setId(id)
class MyNewFolder(DontComplain, Folder):
pass
......
import unittest
_marker = object()
class Test__registerClass(unittest.TestCase):
class TestRegisterClass(unittest.TestCase):
def setUp(self):
from zope.component.testing import setUp
......@@ -22,8 +23,8 @@ class Test__registerClass(unittest.TestCase):
from zope.component.testing import tearDown
import OFS.metaconfigure
# restore registrations
OFS.metaconfigure._meta_type_regs[:] = self._old_mt_regs
OFS.metaconfigure._register_monkies[:] = self._old_monkies
OFS.metaconfigure._meta_type_regs[:] = self._old_mt_regs
OFS.metaconfigure._register_monkies[:] = self._old_monkies
if self._old_metatypes is not _marker:
import Products
Products.meta_types = self._old_metatypes
......@@ -35,7 +36,7 @@ class Test__registerClass(unittest.TestCase):
tearDown()
def _callFUT(self, klass, meta_type, permission,
addview=None, icon=None, global_=False):
addview=None, icon=None, global_=False):
from OFS.metaconfigure import _registerClass
_registerClass(klass, meta_type, permission, addview, icon, global_)
......@@ -45,6 +46,7 @@ class Test__registerClass(unittest.TestCase):
pass
else:
from zope.interface import implements
class Dummy(object):
implements(ifaces)
return Dummy
......@@ -53,10 +55,13 @@ class Test__registerClass(unittest.TestCase):
from zope.component import provideUtility
from zope.interface import implements
from zope.security.interfaces import IPermission
class Perm:
implements(IPermission)
def __init__(self, title):
self. title = title
if title is None:
title = name.capitalize()
provideUtility(Perm(title), name=name)
......@@ -65,9 +70,8 @@ class Test__registerClass(unittest.TestCase):
import OFS.metaconfigure
import Products
return (getattr(Products, 'meta_types', _marker),
OFS.metaconfigure._register_monkies,
OFS.metaconfigure._meta_type_regs,
)
OFS.metaconfigure._register_monkies,
OFS.metaconfigure._meta_type_regs)
def test_minimal(self):
klass = self._makeClass()
......@@ -80,7 +84,7 @@ class Test__registerClass(unittest.TestCase):
self.assertEqual(len(mt), 1)
self.assertEqual(mt[0]['name'], 'Dummy')
self.assertEqual(mt[0]['action'], '')
self.assertEqual(mt[0]['product'], 'OFS') # XXX why?
self.assertEqual(mt[0]['product'], 'OFS')
self.assertEqual(mt[0]['permission'], 'Perm')
self.assertEqual(mt[0]['visibility'], None)
self.assertEqual(mt[0]['interfaces'], ())
......@@ -109,8 +113,10 @@ class Test__registerClass(unittest.TestCase):
def test_w_interfaces(self):
from zope.interface import Interface
class IDummy(Interface):
pass
klass = self._makeClass((IDummy,))
self._registerPermission('perm')
......@@ -135,8 +141,3 @@ class Test__registerClass(unittest.TestCase):
self.assertEqual(klass.meta_type, 'Dummy')
mt, monkies, mt_regs = self._getRegistered()
self.assertEqual(len(mt), 1)
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(Test__registerClass),
))
......@@ -167,6 +167,7 @@ def test_registerClass():
[]
"""
def test_suite():
from Testing.ZopeTestCase import ZopeDocTestSuite
return ZopeDocTestSuite()
......@@ -20,6 +20,7 @@ import sys
from Products.Five.tests import testing
sys.path.append(testing.__path__[0])
def test_registerPackage():
"""
Testing registerPackage
......@@ -33,7 +34,7 @@ def test_registerPackage():
Make sure a python package with a valid initialize gets its
initialize function called::
>>> configure_zcml = '''
... <configure
... xmlns="http://namespaces.zope.org/zope"
......@@ -45,8 +46,8 @@ def test_registerPackage():
... />
... </configure>'''
>>> zcml.load_string(configure_zcml)
We need to load the product as well. This would normally happen during
We need to load the product as well. This would normally happen during
Zope startup, but in the test, we're already too late.
>>> import Zope2
......
......@@ -27,7 +27,7 @@ class TestMaybeWarnDeprecated(unittest.TestCase):
deprecatedManageAddDeleteClasses[:])
self.deprecatedManageAddDeleteClasses = (
deprecatedManageAddDeleteClasses)
# Add a class to make sure there is at least one because an empty
# Add a class to make sure there is at least one because an empty
# deprecatedManageAddDeleteClasses list is special cased
self.deprecatedManageAddDeleteClasses.append(int)
# Pick up log messages
......@@ -66,8 +66,10 @@ class TestMaybeWarnDeprecated(unittest.TestCase):
class Deprecated(object):
def manage_afterAdd(self):
pass
class ASubClass(Deprecated):
pass
self.deprecatedManageAddDeleteClasses.append(Deprecated)
self.assertLog(ASubClass, '')
......
......@@ -18,11 +18,11 @@ import unittest
class BasicUserFolderTests(unittest.TestCase):
def _getTargetClass(self):
from OFS.userfolder import BasicUserFolder
return BasicUserFolder
def test_manage_users_security_initialized(self):
uf = self._getTargetClass()()
self.assertTrue(hasattr(uf, 'manage_users__roles__'))
......@@ -119,7 +119,7 @@ class UserFolderTests(unittest.TestCase):
app.manage_role('Owner', ['Add Folders'])
app.manage_addLocalRoles('user1', ['Owner'])
self.assertTrue(user.has_permission('Add Folders', app))
def testAuthenticate(self):
app = self._makeApp()
uf = self._makeOne(app)
......@@ -176,11 +176,3 @@ class UserFolderTests(unittest.TestCase):
from AccessControl import Unauthorized
app = self._makeApp()
self.assertRaises(Unauthorized, app.restrictedTraverse, 'doc')
def test_suite():
suite = unittest.TestSuite((
unittest.makeSuite(BasicUserFolderTests),
unittest.makeSuite(UserFolderTests),
))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment