Commit efe61189 authored by Hanno Schlichting's avatar Hanno Schlichting

flake8 ZPublisher/Testing

parent 2e9d01ea
The Testing package is a set of shared routines for the Zope unit
testing framework. From Zope 2.8 these are more easily accessed
by using the ZopeTestCase package. See ZopeTestCase/doc for more
information.
To use Testing without ZopeTestCase:
1. Make a 'tests' subdirectory.
2. Copy 'framework.py' into 'tests' from any other package's 'tests'.
Once a test suite has been set up, you can add test modules:
1. Create a file with a name matching 'test*.py'.
2. Define one or more subclasses of 'unittest.TestCase'. The unittest
module is imported by the framework.
3. Define methods for the test classes. Each method's name must start
with 'test'. It should test one small case, using a Python
'assert' statement. Here's a minimal example:
class testClass1(unittest.TestCase):
def testAddition(self):
assert 1 + 1 == 2, 'Addition failed!'
4. You can add 'setUp' and 'tearDown' methods that are automatically
called at the start and end of the test suite.
5. Follow the instructions in 'framework.py' about adding lines to the
top and bottom of the file.
Now you can run the test as "python path/to/tests/testName.py", or
simply go to the 'tests' directory and type "python testName.py".
......@@ -3,10 +3,12 @@ from glob import glob
import ZODB
from ZODB.FileStorage import FileStorage
def makeDB():
s = FileStorage('fs_tmp__%s' % os.getpid())
return ZODB.DB(s)
def cleanDB():
for fn in glob('fs_tmp__*'):
os.remove(fn)
......@@ -15,7 +15,9 @@
After Marius Gedminas' functional.py module for Zope3.
"""
import sys, re, base64
import base64
import re
import sys
import transaction
import sandbox
import interfaces
......@@ -58,8 +60,8 @@ class Functional(sandbox.Sandboxed):
'''Publishes the object at 'path' returning a response object.'''
from StringIO import StringIO
from ZPublisher.Request import Request
from ZPublisher.Response import Response
from ZPublisher.HTTPRequest import HTTPRequest as Request
from ZPublisher.HTTPResponse import HTTPResponse as Response
from ZPublisher.Publish import publish_module
# Commit the sandbox for good measure
......@@ -82,7 +84,7 @@ class Functional(sandbox.Sandboxed):
elif len(p) == 2:
[env['PATH_INFO'], env['QUERY_STRING']] = p
else:
raise TypeError, ''
raise TypeError('')
if basic:
env['HTTP_AUTHORIZATION'] = "Basic %s" % base64.encodestring(basic)
......@@ -99,8 +101,7 @@ class Functional(sandbox.Sandboxed):
publish_module('Zope2',
debug=not handle_errors,
request=request,
response=response,
)
response=response)
return ResponseWrapper(response, outstream, path)
......@@ -140,4 +141,3 @@ class ResponseWrapper:
def getCookie(self, name):
'''Returns a response cookie.'''
return self.cookies.get(name)
......@@ -90,11 +90,14 @@ class DocResponseWrapper(ResponseWrapper):
return "%s\n" % (self.header_output)
basicre = re.compile('Basic (.+)?:(.+)?$')
headerre = re.compile('(\S+): (.+)$')
def split_header(header):
return headerre.match(header).group(1, 2)
basicre = re.compile('Basic (.+)?:(.+)?$')
def auth_header(header):
match = basicre.match(header)
if match:
......@@ -111,6 +114,7 @@ def auth_header(header):
def getRootFolder():
return AppZapper().app()
def sync():
getRootFolder()._p_jar.sync()
......@@ -124,7 +128,7 @@ def http(request_string, handle_errors=True):
import urllib
import rfc822
from cStringIO import StringIO
from ZPublisher.Response import Response
from ZPublisher.HTTPResponse import HTTPResponse as Response
from ZPublisher.Publish import publish_module
# Commit work done by previous python code.
......@@ -136,7 +140,7 @@ def http(request_string, handle_errors=True):
# Split off and parse the command line
l = request_string.find('\n')
command_line = request_string[:l].rstrip()
request_string = request_string[l+1:]
request_string = request_string[l + 1:]
method, path, protocol = command_line.split()
path = urllib.unquote(path)
......@@ -154,7 +158,7 @@ def http(request_string, handle_errors=True):
elif len(p) == 2:
[env['PATH_INFO'], env['QUERY_STRING']] = p
else:
raise TypeError, ''
raise TypeError('')
header_output = HTTPHeaderOutput(
protocol, ('x-content-type-warning', 'x-powered-by',
......@@ -173,7 +177,7 @@ def http(request_string, handle_errors=True):
name = 'HTTP_' + name
env[name] = value.rstrip()
if env.has_key('HTTP_AUTHORIZATION'):
if 'HTTP_AUTHORIZATION' in env:
env['HTTP_AUTHORIZATION'] = auth_header(env['HTTP_AUTHORIZATION'])
outstream = StringIO()
......@@ -183,8 +187,8 @@ def http(request_string, handle_errors=True):
response=response,
stdin=instream,
environ=env,
debug=not handle_errors,
)
debug=not handle_errors)
header_output.setResponseStatus(response.getStatus(), response.errmsg)
header_output.setResponseHeaders(response.headers)
header_output.headersl.extend(response._cookie_list())
......@@ -246,6 +250,7 @@ class ZopeSuiteFactory:
test_instance = test_class()
kwsetUp = self._kw.get('setUp')
def setUp(test):
test_instance.setUp()
test.globs['test'] = test
......@@ -264,6 +269,7 @@ class ZopeSuiteFactory:
self._kw['setUp'] = setUp
kwtearDown = self._kw.get('tearDown')
def tearDown(test):
if kwtearDown is not None:
kwtearDown(test_instance)
......@@ -273,8 +279,8 @@ class ZopeSuiteFactory:
def setup_optionflags(self):
if 'optionflags' not in self._kw:
self._kw['optionflags'] = (doctest.ELLIPSIS
| doctest.NORMALIZE_WHITESPACE)
self._kw['optionflags'] = (
doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
class FunctionalSuiteFactory(ZopeSuiteFactory):
......@@ -285,7 +291,8 @@ class FunctionalSuiteFactory(ZopeSuiteFactory):
globs['http'] = http
globs['getRootFolder'] = getRootFolder
globs['sync'] = sync
globs['user_auth'] = base64.encodestring('%s:%s' % (user_name, user_password))
globs['user_auth'] = base64.encodestring(
'%s:%s' % (user_name, user_password))
def setup_test_class(self):
test_class = self._kw.get('test_class', FunctionalTestCase)
......@@ -297,7 +304,7 @@ class FunctionalSuiteFactory(ZopeSuiteFactory):
warnings.warn(("The test_class you are using doesn't "
"subclass from ZopeTestCase.Functional. "
"Please fix that."), UserWarning, 4)
if not 'Functional' in name:
if 'Functional' not in name:
name = 'Functional%s' % name
test_class = type(name, (Functional, test_class), {})
......@@ -306,24 +313,27 @@ class FunctionalSuiteFactory(ZopeSuiteFactory):
def setup_optionflags(self):
if 'optionflags' not in self._kw:
self._kw['optionflags'] = (doctest.ELLIPSIS
| doctest.REPORT_NDIFF
| doctest.NORMALIZE_WHITESPACE)
self._kw['optionflags'] = (
doctest.ELLIPSIS | doctest.REPORT_NDIFF |
doctest.NORMALIZE_WHITESPACE)
def ZopeDocTestSuite(module=None, **kw):
module = doctest._normalize_module(module)
return ZopeSuiteFactory(module, **kw).doctestsuite()
def ZopeDocFileSuite(*paths, **kw):
if kw.get('module_relative', True):
kw['package'] = doctest._normalize_module(kw.get('package'))
return ZopeSuiteFactory(*paths, **kw).docfilesuite()
def FunctionalDocTestSuite(module=None, **kw):
module = doctest._normalize_module(module)
return FunctionalSuiteFactory(module, **kw).doctestsuite()
def FunctionalDocFileSuite(*paths, **kw):
if kw.get('module_relative', True):
kw['package'] = doctest._normalize_module(kw.get('package'))
......@@ -335,5 +345,4 @@ __all__ = [
'ZopeDocFileSuite',
'FunctionalDocTestSuite',
'FunctionalDocFileSuite',
]
]
# Default test runner
import unittest
TestRunner = unittest.TextTestRunner
def debug():
test_suite().debug()
def pdebug():
import pdb
pdb.run('debug()')
def test_suite():
# The default test suite includes every subclass of TestCase in
# the module, with 'test' as the test method prefix.
ClassType = type(unittest.TestCase)
tests = []
for v in globals().values():
if isinstance(v, ClassType) and issubclass(v, unittest.TestCase):
tests.append(unittest.makeSuite(v))
if len(tests) > 1:
return unittest.TestSuite(tests)
if len(tests) == 1:
return tests[0]
return
class Dummy:
'''Utility class for quick & dirty instances'''
def __init__(self, **kw):
self.__dict__.update(kw)
def __str__( self ):
return 'Dummy(%s)' % `self.__dict__`
__repr__ = __str__
import os
import logging
import ZODB
LOG = logging.getLogger('Testing')
def getStorage():
""" Return a storage instance for running ZopeTestCase based
""" Return a storage instance for running ZopeTestCase based
tests. By default a DemoStorage is used. Setting
$TEST_ZEO_HOST/TEST_ZEO_PORT environment variables allows you
to use a ZEO server instead. A file storage can be configured
......@@ -15,14 +14,14 @@ def getStorage():
get = os.environ.get
if os.environ.has_key('TEST_ZEO_HOST') and os.environ.has_key('TEST_ZEO_PORT'):
if 'TEST_ZEO_HOST' in os.environ and 'TEST_ZEO_PORT' in os.environ:
from ZEO.ClientStorage import ClientStorage
zeo_host = get('TEST_ZEO_HOST')
zeo_port = int(get('TEST_ZEO_PORT'))
LOG.info('Using ZEO server (%s:%d)' % (zeo_host, zeo_port))
return ClientStorage((zeo_host, zeo_port))
elif os.environ.has_key('TEST_FILESTORAGE'):
elif 'TEST_FILESTORAGE' in os.environ:
import ZODB.FileStorage
datafs = get('TEST_FILESTORAGE')
LOG.info('Using Filestorage at (%s)' % datafs)
......
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# Dispatcher for usage inside Zope test environment
# Andreas Jung, andreas@digicool.com 03/24/2001
import os,sys,re,string
import threading,time,commands,profile
class Dispatcher:
"""
a multi-purpose thread dispatcher
"""
def __init__(self,func=''):
self.fp = sys.stderr
self.f_startup = []
self.f_teardown = []
self.lastlog = ""
self.lock = threading.Lock()
self.func = func
self.profiling = 0
self.doc = getattr(self,self.func).__doc__
def setlog(self,fp):
self.fp = fp
def log(self,s):
if s==self.lastlog: return
self.fp.write(s)
self.fp.flush()
self.lastlog=s
def logn(self,s):
if s==self.lastlog: return
self.fp.write(s + '\n')
self.fp.flush()
self.lastlog=s
def profiling_on():
self.profiling = 1
def profiling_off():
self.profiling = 0
def dispatcher(self,name='', *params):
""" dispatcher for threads
The dispatcher expects one or several tupels:
(functionname, number of threads to start , args, keyword args)
"""
self.mem_usage = [-1]
mem_watcher = threading.Thread(None,self.mem_watcher,name='memwatcher')
mem_watcher.start()
self.start_test = time.time()
self.name = name
self.th_data = {}
self.runtime = {}
self._threads = []
s2s=self.s2s
for func,numthreads,args,kw in params:
f = getattr(self,func)
for i in range(0,numthreads):
kw['t_func'] = func
th = threading.Thread(None,self.worker,name="TH_%s_%03d" % (func,i) ,args=args,kwargs=kw)
self._threads.append(th)
for th in self._threads: th.start()
while threading.activeCount() > 1: time.sleep(1)
self.logn('ID: %s ' % self.name)
self.logn('FUNC: %s ' % self.func)
self.logn('DOC: %s ' % self.doc)
self.logn('Args: %s' % params)
for th in self._threads:
self.logn( '%-30s ........................ %9.3f sec' % (th.getName(), self.runtime[th.getName()]) )
for k,v in self.th_data[th.getName()].items():
self.logn ('%-30s %-15s = %s' % (' ',k,v) )
self.logn("")
self.logn('Complete running time: %9.3f sec' % (time.time()-self.start_test) )
if len(self.mem_usage)>1: self.mem_usage.remove(-1)
self.logn( "Memory: start: %s, end: %s, low: %s, high: %s" % \
(s2s(self.mem_usage[0]),s2s(self.mem_usage[-1]),s2s(min(self.mem_usage)), s2s(max(self.mem_usage))))
self.logn('')
def worker(self,*args,**kw):
for func in self.f_startup: f = getattr(self,func)()
t_func = getattr(self,kw['t_func'])
del kw['t_func']
ts = time.time()
apply(t_func,args,kw)
te = time.time()
for func in self.f_teardown: getattr(self,func)()
def th_setup(self):
""" initalize thread with some environment data """
env = {'start': time.time()
}
return env
def th_teardown(self,env,**kw):
""" famous last actions of thread """
self.lock.acquire()
self.th_data[ threading.currentThread().getName() ] = kw
self.runtime [ threading.currentThread().getName() ] = time.time() - env['start']
self.lock.release()
def getmem(self):
""" try to determine the current memory usage """
if not sys.platform in ['linux2']: return None
cmd = '/bin/ps --no-headers -o pid,vsize --pid %s' % os.getpid()
outp = commands.getoutput(cmd)
pid,vsize = filter(lambda x: x!="" , string.split(outp," ") )
data = open("/proc/%d/statm" % os.getpid()).read()
fields = re.split(" ",data)
mem = string.atoi(fields[0]) * 4096
return mem
def mem_watcher(self):
""" thread for watching memory usage """
running = 1
while running ==1:
self.mem_usage.append( self.getmem() )
time.sleep(1)
if threading.activeCount() == 2: running = 0
def register_startup(self,func):
self.f_startup.append(func)
def register_teardown(self,func):
self.f_teardown.append(func)
def s2s(self,n):
import math
if n <1024.0: return "%8.3lf Bytes" % n
if n <1024.0*1024.0: return "%8.3lf KB" % (1.0*n/1024.0)
if n <1024.0*1024.0*1024.0: return "%8.3lf MB" % (1.0*n/1024.0/1024.0)
else: return n
if __name__=="__main__":
d=Dispatcher()
print d.getmem()
pass
......@@ -15,24 +15,24 @@ Facilitates unit tests which requires an acquirable REQUEST from
ZODB objects
"""
import os
from sys import stdin, stdout
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.BaseRequest import RequestContainer
def makerequest(app, stdout=stdout, environ=None):
"""
Adds an HTTPRequest at app.REQUEST, and returns
app.__of__(app.REQUEST). Useful for tests that need to acquire
REQUEST.
Usage:
import makerequest
app = makerequest.makerequest(app)
You should only wrap the object used as 'root' in your tests.
app is commonly a Zope2.app(), but that's not strictly necessary
and frequently may be overkill; you can wrap other objects as long
as they support acquisition and provide enough of the features of
......@@ -52,15 +52,14 @@ def makerequest(app, stdout=stdout, environ=None):
resp = HTTPResponse(stdout=stdout)
environ.setdefault('SERVER_NAME', 'foo')
environ.setdefault('SERVER_PORT', '80')
environ.setdefault('REQUEST_METHOD', 'GET')
environ.setdefault('REQUEST_METHOD', 'GET')
req = HTTPRequest(stdin, environ, resp)
req._steps = ['noobject'] # Fake a published object.
req['ACTUAL_URL'] = req.get('URL') # Zope 2.7.4
# set Zope3-style default skin so that the request is usable for
# Zope3-style view look-ups.
req['ACTUAL_URL'] = req.get('URL') # Zope 2.7.4
# Set default skin so that the request is usable for view look-ups.
from zope.publisher.browser import setDefaultSkin
setDefaultSkin(req)
requestcontainer = RequestContainer(REQUEST = req)
requestcontainer = RequestContainer(REQUEST=req)
return app.__of__(requestcontainer)
......@@ -19,6 +19,7 @@ from Acquisition import Implicit
from Testing.makerequest import makerequest
from OFS.SimpleItem import SimpleItem
class MakerequestTests(unittest.TestCase):
def test_makerequest(self):
......@@ -29,7 +30,7 @@ class MakerequestTests(unittest.TestCase):
self.assertFalse(hasattr(item, 'REQUEST'))
item = makerequest(item)
self.assertTrue(hasattr(item, 'REQUEST'))
def test_dont_break_getPhysicalPath(self):
# see http://www.zope.org/Collectors/Zope/2057. If you want
# to call getPhysicalPath() on the wrapped object, be sure
......@@ -57,8 +58,3 @@ class MakerequestTests(unittest.TestCase):
environ = {'foofoo': 'barbar'}
item = makerequest(SimpleItem(), environ=environ)
self.assertEqual(item.REQUEST.environ['foofoo'], 'barbar')
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(MakerequestTests))
return suite
......@@ -18,6 +18,7 @@ import unittest
from Testing.ZopeTestCase import FunctionalDocTestSuite
from OFS.SimpleItem import Item
class CookieStub(Item):
"""This is a cookie stub."""
......@@ -25,6 +26,7 @@ class CookieStub(Item):
REQUEST.RESPONSE.setCookie('evil', 'cookie')
return 'Stub'
def doctest_cookies():
"""
We want to make sure that our testbrowser correctly understands
......@@ -52,6 +54,7 @@ def doctest_cookies():
True
"""
def doctest_camel_case_headers():
"""Make sure that the headers come out in camel case.
......@@ -76,5 +79,5 @@ def doctest_camel_case_headers():
def test_suite():
return unittest.TestSuite((
FunctionalDocTestSuite(),
))
FunctionalDocTestSuite(),
))
This diff is collapsed.
......@@ -15,10 +15,9 @@
from zExceptions import Unauthorized, Forbidden, NotFound, BadRequest
class BaseResponse:
"""Base Response Class
What should be here?
"""
debug_mode = None
_auth = None
......@@ -59,7 +58,7 @@ class BaseResponse:
'Returns the current HTTP status code as an integer. '
return self.status
def setCookie(self,name,value,**kw):
def setCookie(self, name, value, **kw):
'''\
Set an HTTP cookie on the browser
......@@ -101,12 +100,12 @@ class BaseResponse:
return str(self.body)
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, `self.body`)
return '%s(%r)' % (self.__class__.__name__, self.body)
def flush(self):
pass
def write(self,data):
def write(self, data):
"""\
Return data as a stream
......@@ -121,7 +120,7 @@ class BaseResponse:
after beginning stream-oriented output.
"""
self.body = self.body+data
self.body = self.body + data
def exception(self, fatal=0, info=None):
"""Handle an exception.
......@@ -135,23 +134,23 @@ class BaseResponse:
def notFoundError(self, v=''):
"""Generate an error indicating that an object was not found.
"""
raise NotFound, v
raise NotFound(v)
def debugError(self, v=''):
"""Raise an error with debigging info and in debugging mode"""
raise NotFound, "Debugging notice: %s" % v
raise NotFound("Debugging notice: %s" % v)
def badRequestError(self, v=''):
"""Raise an error indicating something wrong with the request"""
raise BadRequest, v
raise BadRequest(v)
def forbiddenError(self, v=''):
"""Raise an error indicating that the request cannot be done"""
raise Forbidden, v
raise Forbidden(v)
def unauthorized(self):
"""Raise an eror indicating that the user was not authizated
Make sure to generate an appropriate challenge, as appropriate.
"""
raise Unauthorized
raise Unauthorized()
......@@ -15,10 +15,9 @@
from Acquisition import aq_base
from logging import getLogger
# Interface
LOG = getLogger('ZPublisher')
def registerBeforeTraverse(container, object, app_handle, priority=99):
"""Register an object to be called before a container is traversed.
......@@ -36,6 +35,7 @@ def registerBeforeTraverse(container, object, app_handle, priority=99):
btr[(priority, app_handle)] = object
rewriteBeforeTraverse(container, btr)
def unregisterBeforeTraverse(container, app_handle):
"""Unregister a __before_traverse__ hook object, given its 'app_handle'.
......@@ -50,6 +50,7 @@ def unregisterBeforeTraverse(container, app_handle):
rewriteBeforeTraverse(container, btr)
return objects
def queryBeforeTraverse(container, app_handle):
"""Find __before_traverse__ hook objects, given an 'app_handle'.
......@@ -61,7 +62,6 @@ def queryBeforeTraverse(container, app_handle):
objects.append((k[0], btr[k]))
return objects
# Implementation tools
def rewriteBeforeTraverse(container, btr):
"""Rewrite the list of __before_traverse__ hook objects"""
......@@ -79,6 +79,7 @@ def rewriteBeforeTraverse(container, btr):
for key in keys:
bpth.add(btr[key])
class MultiHook:
"""Class used to multiplex hook.
......@@ -102,13 +103,12 @@ class MultiHook:
try:
cob(container, request)
except TypeError:
LOG.error('%s call %s failed.' % (
`self._hookname`, `cob`), exc_info=True)
LOG.error('%r call %r failed.' % (
self._hookname, cob), exc_info=True)
def add(self, cob):
self._list.append(cob)
# Helper class
class NameCaller:
"""Class used to proxy sibling objects by name.
......@@ -134,10 +134,8 @@ class NameCaller:
# This happens especially, if "meth" is a "CookieCrumber" instance,
# i.e. in a CMF Portal, if a DTMLMethod (or a similar object
# with a fake "func_code" is in the acquisition context
#args = getattr(getattr(meth, 'func_code', None), 'co_argcount', 2)
args = getattr(getattr(aq_base(meth), 'func_code', None),
'co_argcount',
2)
'co_argcount', 2)
try:
meth(*(container, request, None)[:args])
......@@ -148,5 +146,5 @@ class NameCaller:
# Only catch exceptions that are likely to be logic errors.
# We shouldn't catch Redirects, Unauthorizeds, etc. since
# the programmer may want to raise them deliberately.
LOG.error('BeforeTraverse: Error while invoking hook: "%s"' % self.name,
exc_info=True)
LOG.error('BeforeTraverse: Error while invoking hook: "%s"' %
self.name, exc_info=True)
This diff is collapsed.
......@@ -12,7 +12,6 @@
##############################################################################
import re
from types import ListType, TupleType, UnicodeType
from DateTime import DateTime
from DateTime.interfaces import SyntaxError
from cgi import escape
......@@ -20,89 +19,106 @@ from cgi import escape
# This may get overwritten during configuration
default_encoding = 'utf-8'
def field2string(v):
if hasattr(v,'read'): return v.read()
elif isinstance(v,UnicodeType):
if hasattr(v, 'read'):
return v.read()
elif isinstance(v, unicode):
return v.encode(default_encoding)
else:
return str(v)
def field2text(v, nl=re.compile('\r\n|\n\r').search):
v = field2string(v)
mo = nl(v)
if mo is None: return v
if mo is None:
return v
l = mo.start(0)
r=[]
s=0
r = []
s = 0
while l >= s:
r.append(v[s:l])
s=l+2
mo=nl(v,s)
if mo is None: l=-1
else: l=mo.start(0)
s = l + 2
mo = nl(v, s)
if mo is None:
l = -1
else:
l = mo.start(0)
r.append(v[s:])
return '\n'.join(r)
def field2required(v):
v = field2string(v)
if v.strip(): return v
raise ValueError, 'No input for required field<p>'
if v.strip():
return v
raise ValueError('No input for required field<p>')
def field2int(v):
if isinstance(v, (ListType, TupleType)):
if isinstance(v, (list, tuple)):
return map(field2int, v)
v = field2string(v)
if v:
try: return int(v)
try:
return int(v)
except ValueError:
raise ValueError, (
"An integer was expected in the value %s" % escape(`v`)
)
raise ValueError, 'Empty entry when <strong>integer</strong> expected'
raise ValueError(
"An integer was expected in the value %r" % escape(v)
)
raise ValueError('Empty entry when <strong>integer</strong> expected')
def field2float(v):
if isinstance(v, (ListType, TupleType)):
if isinstance(v, (list, tuple)):
return map(field2float, v)
v = field2string(v)
if v:
try: return float(v)
try:
return float(v)
except ValueError:
raise ValueError, (
"A floating-point number was expected in the value %s" %
escape(`v`)
)
raise ValueError, (
raise ValueError(
"A floating-point number was expected in the value %r" %
escape(v)
)
raise ValueError(
'Empty entry when <strong>floating-point number</strong> expected')
def field2long(v):
if isinstance(v, (ListType, TupleType)):
if isinstance(v, (list, tuple)):
return map(field2long, v)
v = field2string(v)
# handle trailing 'L' if present.
if v[-1:] in ('L', 'l'):
v = v[:-1]
if v:
try: return long(v)
try:
return int(v)
except ValueError:
raise ValueError, (
"A long integer was expected in the value %s" % escape(`v`)
)
raise ValueError, 'Empty entry when <strong>integer</strong> expected'
raise ValueError(
"A long integer was expected in the value %r" % escape(v)
)
raise ValueError('Empty entry when <strong>integer</strong> expected')
def field2tokens(v):
v = field2string(v)
return v.split()
def field2lines(v):
if isinstance(v, (ListType, TupleType)):
result=[]
if isinstance(v, (list, tuple)):
result = []
for item in v:
result.append(str(item))
return result
return field2text(v).splitlines()
def field2date(v):
v = field2string(v)
try:
......@@ -111,6 +127,7 @@ def field2date(v):
raise SyntaxError("Invalid DateTime " + escape(repr(v)))
return v
def field2date_international(v):
v = field2string(v)
try:
......@@ -119,46 +136,57 @@ def field2date_international(v):
raise SyntaxError("Invalid DateTime " + escape(repr(v)))
return v
def field2boolean(v):
if v == 'False':
return not 1
return not not v
class _unicode_converter:
def __call__(self,v):
# Convert a regular python string. This probably doesnt do what you want,
# whatever that might be. If you are getting exceptions below, you
# probably missed the encoding tag from a form field name. Use:
def __call__(self, v):
# Convert a regular python string. This probably doesn't do
# what you want, whatever that might be. If you are getting
# exceptions below, you probably missed the encoding tag
# from a form field name. Use:
# <input name="description:utf8:ustring" .....
# rather than
# <input name="description:ustring" .....
if hasattr(v,'read'): v=v.read()
if hasattr(v, 'read'):
v = v.read()
v = unicode(v)
return self.convert_unicode(v)
def convert_unicode(self,v):
def convert_unicode(self, v):
raise NotImplementedError('convert_unicode')
class field2ustring(_unicode_converter):
def convert_unicode(self,v):
def convert_unicode(self, v):
return v
field2ustring = field2ustring()
class field2utokens(_unicode_converter):
def convert_unicode(self,v):
def convert_unicode(self, v):
return v.split()
field2utokens = field2utokens()
class field2utext(_unicode_converter):
def convert_unicode(self,v):
return unicode(field2text(v.encode('utf8')),'utf8')
def convert_unicode(self, v):
return unicode(field2text(v.encode('utf8')), 'utf8')
field2utext = field2utext()
class field2ulines:
def __call__(self, v):
if hasattr(v,'read'):
v=v.read()
if isinstance(v, (ListType, TupleType)):
if hasattr(v, 'read'):
v = v.read()
if isinstance(v, (list, tuple)):
return [field2ustring(x) for x in v]
v = unicode(v)
return self.convert_unicode(v)
......@@ -169,21 +197,21 @@ class field2ulines:
field2ulines = field2ulines()
type_converters = {
'float': field2float,
'int': field2int,
'long': field2long,
'string': field2string,
'date': field2date,
'float': field2float,
'int': field2int,
'long': field2long,
'string': field2string,
'date': field2date,
'date_international': field2date_international,
'required': field2required,
'tokens': field2tokens,
'lines': field2lines,
'text': field2text,
'boolean': field2boolean,
'ustring': field2ustring,
'utokens': field2utokens,
'ulines': field2ulines,
'utext': field2utext,
}
get_converter=type_converters.get
'required': field2required,
'tokens': field2tokens,
'lines': field2lines,
'text': field2text,
'boolean': field2boolean,
'ustring': field2ustring,
'utokens': field2utokens,
'ulines': field2ulines,
'utext': field2utext,
}
get_converter = type_converters.get
......@@ -19,11 +19,13 @@ flag-interface and some support functions for implementing this functionality.
For an implementation example, see the File class in OFS/Image.py.
"""
import re, sys
import re
import sys
from zope.interface import Interface
WHITESPACE = re.compile('\s*', re.MULTILINE)
def parseRange(header):
"""RFC 2616 (HTTP 1.1) Range header parsing.
......@@ -32,7 +34,6 @@ def parseRange(header):
end offset to be inclusive, we return python convention indexes, where the
end is exclusive. Syntactically incorrect headers are to be ignored, so if
we encounter one we return None.
"""
ranges = []
......@@ -43,8 +44,11 @@ def parseRange(header):
header = WHITESPACE.sub('', header)
# A range header only can specify a byte range
try: spec, sets = header.split('=')
except ValueError: return None
try:
spec, sets = header.split('=')
except ValueError:
return None
if spec != 'bytes':
return None
......@@ -57,8 +61,10 @@ def parseRange(header):
return None
for set in sets:
try: start, end = set.split('-')
except ValueError: return None
try:
start, end = set.split('-')
except ValueError:
return None
# Catch empty sets
if not start and not end:
......@@ -67,10 +73,14 @@ def parseRange(header):
# Convert to integers or None (which will raise errors if
# non-integers were used (which is what we want)).
try:
if start == '': start = None
else: start = int(start)
if end == '': end = None
else: end = int(end)
if start == '':
start = None
else:
start = int(start)
if end == '':
end = None
else:
end = int(end)
except ValueError:
return None
......@@ -84,7 +94,7 @@ def parseRange(header):
if not start:
start = sys.maxint
elif end is not None:
end = end + 1 # Make the end of the range exclusive
end = end + 1 # Make the end of the range exclusive
if end is not None and end <= start:
return None
......@@ -94,11 +104,11 @@ def parseRange(header):
return ranges
def expandRanges(ranges, size):
"""Expand Range sets, given those sets and the length of the resource.
Expansion means relative start values and open ends
"""
expanded = []
......@@ -107,13 +117,15 @@ def expandRanges(ranges, size):
if start < 0:
start = size + start
end = end or size
if end > size: end = size
if end > size:
end = size
# Only use satisfiable ranges
if start < size:
add((start, end))
return expanded
class HTTPRangeInterface(Interface):
"""Objects implementing this Interface support the HTTP Range header.
......@@ -124,5 +136,4 @@ class HTTPRangeInterface(Interface):
This interface specifies no methods, as this functionality can either be
implemented in the index_html or __call__ methods of a published object.
"""
This diff is collapsed.
from zope.interface import Interface
from zope.interface import implements
class IUnboundStreamIterator(Interface):
"""
An iterator with unknown length that can be published.
......@@ -25,7 +26,8 @@ class IStreamIterator(IUnboundStreamIterator):
is still closed, ZODB would raise an error. If the connection
happens to be re-opened by another thread, ZODB might allow it,
but it has a chance of going insane if it happens to be loading
or storing something in the other thread at the same time. """
or storing something in the other thread at the same time.
"""
def __len__():
"""
......@@ -42,7 +44,7 @@ class filestream_iterator(file):
implements(IStreamIterator)
def __init__(self, name, mode='r', bufsize=-1, streamsize=1<<16):
def __init__(self, name, mode='r', bufsize=-1, streamsize=1 << 16):
file.__init__(self, name, mode, bufsize)
self.streamsize = streamsize
......@@ -57,5 +59,4 @@ class filestream_iterator(file):
self.seek(0, 2)
size = self.tell()
self.seek(cur_pos, 0)
return size
This diff is collapsed.
......@@ -10,6 +10,11 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import HTTPRequest
Request=HTTPRequest.HTTPRequest
del HTTPRequest
from zope.deferredimport import deprecated
# BBB: Zope 5.0
deprecated(
'Please import from ZPublisher.HTTPRequest',
Request='ZPublisher.HTTPRequest:HTTPRequest',
)
......@@ -10,6 +10,11 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import HTTPResponse
Response=HTTPResponse.HTTPResponse
del HTTPResponse
from zope.deferredimport import deprecated
# BBB: Zope 5.0
deprecated(
'Please import from ZPublisher.HTTPResponse',
Response='ZPublisher.HTTPResponse:HTTPResponse',
)
......@@ -11,18 +11,6 @@
#
##############################################################################
# This allows ZPublisher to work with embedded interpreters
# that for some reason have no sys.argv (required by cgi.py).
import sys
if not hasattr(sys, 'argv'):
sys.argv=[]
from zExceptions import NotFound, BadRequest, InternalError, Forbidden # NOQA
from zExceptions import NotFound, BadRequest, InternalError, Forbidden
from Publish import publish_module, Retry
def test(*args, **kw):
global test
import Test
test=Test.publish
return apply(test, args, kw)
from ZPublisher.Publish import publish_module, Retry # NOQA
......@@ -4,6 +4,7 @@ from zope.interface import Interface, Attribute
# Publication events
# These are events notified in 'ZPublisher.Publish.publish'.
class IPubEvent(Interface):
'''Base class for publication events.
......@@ -12,9 +13,11 @@ class IPubEvent(Interface):
'''
request = Attribute('The request being affected')
class IPubStart(IPubEvent):
'''Event notified at the beginning of 'ZPublisher.Publish.publish'.'''
class IPubEnd(IPubEvent):
'''Event notified after request processing.
......@@ -22,16 +25,19 @@ class IPubEnd(IPubEvent):
itself is considered a new event.
'''
class IPubSuccess(IPubEnd):
'''A successful request processing.'''
class IPubFailure(IPubEnd):
'''A failed request processing.
Note: If a subscriber to 'IPubSuccess' raises an exception,
then 'IPubFailure' may be notified in addtion to 'IPubSuccess'.
'''
exc_info = Attribute('''The exception info as returned by 'sys.exc_info()'.''')
exc_info = Attribute(
'''The exception info as returned by 'sys.exc_info()'.''')
retry = Attribute('Whether the request will be retried')
......@@ -44,29 +50,28 @@ class IPubBeforeCommit(IPubEvent):
request processing is finished).
"""
class IPubBeforeAbort(IPubEvent):
"""notified immediately before the transaction abort (i.e. after the main
request processing is finished, and there was an error).
"""
exc_info = Attribute('''The exception info as returned by 'sys.exc_info()'.''')
exc_info = Attribute(
'''The exception info as returned by 'sys.exc_info()'.''')
retry = Attribute('Whether the request will be retried')
class IPubBeforeStreaming(Interface):
"""Event fired just before a streaming response is initiated, i.e. when
something calls response.write() for the first time. Note that this is
carries a reference to the *response*, not the request.
"""
response = Attribute(u"The current HTTP response")
# Exceptions
class UseTraversalDefault(Exception):
"""Indicate default traversal in ``__bobo_traverse__``
This exception can be raised by '__bobo_traverse__' implementations to
indicate that it has no special casing for the given name and that standard
traversal logic should be applied.
"""
......@@ -14,22 +14,26 @@
"""
import zope.publisher.publish
def default_call_object(object, args, context):
result=object(*args) # Type s<cr> to step into published object.
result = object(*args) # Type s<cr> to step into published object.
return result
def default_missing_name(name, context):
raise TypeError, 'argument %s was ommitted' % name
raise TypeError('argument %s was ommitted' % name)
def default_handle_class(klass, context):
if hasattr(klass,'__init__'):
f=klass.__init__.im_func
c=f.func_code
names=c.co_varnames[1:c.co_argcount]
if hasattr(klass, '__init__'):
f = klass.__init__.im_func
c = f.func_code
names = c.co_varnames[1:c.co_argcount]
return klass, names, f.func_defaults
else:
return klass, (), ()
def mapply(object, positional=(), keyword={},
debug=None, maybe=None,
missing_name=default_missing_name,
......@@ -37,7 +41,7 @@ def mapply(object, positional=(), keyword={},
context=None, bind=0,
):
if hasattr(object,'__bases__'):
if hasattr(object, '__bases__'):
f, names, defaults = handle_class(object, context)
else:
try:
......@@ -50,29 +54,34 @@ def mapply(object, positional=(), keyword={},
defaults = f.func_defaults
names = code.co_varnames[count:code.co_argcount]
nargs=len(names)
nargs = len(names)
if positional:
positional=list(positional)
if bind and nargs and names[0]=='self':
positional = list(positional)
if bind and nargs and names[0] == 'self':
positional.insert(0, missing_name('self', context))
if len(positional) > nargs: raise TypeError, 'too many arguments'
args=positional
if len(positional) > nargs:
raise TypeError('too many arguments')
args = positional
else:
if bind and nargs and names[0]=='self':
args=[missing_name('self', context)]
if bind and nargs and names[0] == 'self':
args = [missing_name('self', context)]
else:
args=[]
args = []
get=keyword.get
nrequired=len(names) - (len(defaults or ()))
get = keyword.get
nrequired = len(names) - (len(defaults or ()))
for index in range(len(args), len(names)):
name=names[index]
v=get(name, args)
name = names[index]
v = get(name, args)
if v is args:
if index < nrequired: v=missing_name(name, context)
else: v=defaults[index-nrequired]
if index < nrequired:
v = missing_name(name, context)
else:
v = defaults[index - nrequired]
args.append(v)
args=tuple(args)
if debug is not None: return debug(object,args,context)
else: return object(*args)
args = tuple(args)
if debug is not None:
return debug(object, args, context)
else:
return object(*args)
......@@ -11,9 +11,4 @@
#
##############################################################################
# Waaaa, I wish I didn't have to work this hard.
try: from thread import allocate_lock
except:
class allocate_lock:
def acquire(*args): pass
def release(*args): pass
from thread import allocate_lock # NOQA
'''Publication events.
They are notified in 'ZPublisher.Publish.publish' and
They are notified in 'ZPublisher.Publish.publish' and
inform about publications and their fate.
Subscriptions can be used for all kinds of request supervision,
......@@ -9,9 +9,12 @@ for detailed time related analysis, inline request monitoring.
'''
from zope.interface import implements
from interfaces import IPubStart, IPubSuccess, IPubFailure, \
IPubAfterTraversal, IPubBeforeCommit, IPubBeforeAbort, \
IPubBeforeStreaming
from ZPublisher.interfaces import (
IPubStart, IPubSuccess, IPubFailure,
IPubAfterTraversal, IPubBeforeCommit, IPubBeforeAbort,
IPubBeforeStreaming,
)
class _Base(object):
"""PubEvent base class."""
......@@ -19,14 +22,17 @@ class _Base(object):
def __init__(self, request):
self.request = request
class PubStart(_Base):
'''notified at the beginning of 'ZPublisher.Publish.publish'.'''
implements(IPubStart)
class PubSuccess(_Base):
'''notified at successful request end.'''
implements(IPubSuccess)
class PubFailure(object):
'''notified at failed request end.'''
implements(IPubFailure)
......@@ -44,17 +50,19 @@ class PubBeforeCommit(_Base):
"""notified immediately before the commit."""
implements(IPubBeforeCommit)
class PubBeforeAbort(_Base):
"""notified immediately before an abort."""
implements(IPubBeforeAbort)
def __init__(self, request, exc_info, retry):
self.request, self.exc_info, self.retry = request, exc_info, retry
class PubBeforeStreaming(object):
"""Notified immediately before streaming via response.write() commences
"""
implements(IPubBeforeStreaming)
def __init__(self, response):
self.response = response
## This script requires:
## - python >= 2.4
## - zope.testbrowser
##
## The just run:
## $python generate_conflicts.py
import base64
import string
import threading
import urllib2
from zope.testbrowser.browser import Browser
# create our browser
class AuthBrowser(Browser):
def addBasicAuth(self,username,password):
self.addHeader(
'Authorization',
'Basic '+base64.encodestring(username+':'+password).strip()
)
def open(self,uri,include_server=True):
if include_server:
uri = server+uri
return Browser.open(self,uri)
browser = AuthBrowser()
# constants
server = 'http://localhost:8080'
# the following user must be able to view the management screens
# and create file objects
username = 'username'
password = 'password'
browser.addBasicAuth(username,password)
threads = 10
filename = 'conflict.txt'
filesize = 10000
hits = 5
# delete the file if it's already there
browser.open('/manage_main')
if filename in [c.optionValue
for c in browser.getControl(name='ids:list').controls]:
browser.open('/manage_delObjects?ids:list='+filename)
# create it
browser.open('/manage_addFile?id='+filename)
# edit it, hopefully causing conflicts
data = 'X'*filesize
class EditThread(threading.Thread):
def __init__(self,i):
self.conflicts = 0
self.browser = AuthBrowser()
self.browser.handleErrors = False
self.browser.addBasicAuth(username,password)
threading.Thread.__init__(self,name=str(i))
def run(self):
for i in range(1,hits+1):
self.browser.open('/conflict.txt/manage_main')
self.browser.getControl(name='title').value='Test Title'
self.browser.getControl(name='filedata:text').value = data
try:
self.browser.getControl(name='manage_edit:method').click()
except urllib2.HTTPError,e:
# print e.read()
self.conflicts += 1
print "Thread %s - CONFLICT" % self.getName()
else:
print "Thread %s - EDIT" % self.getName()
thread_objects = []
for i in range(1,threads+1):
t = EditThread(i)
thread_objects.append(t)
t.start()
for t in thread_objects:
t.join()
total = 0
print
for t in thread_objects:
print "Thread %s - %i conflicts seen" % (t.getName(),t.conflicts)
total += t.conflicts
print
print "%i conflicts seen by browsers" % total
This diff is collapsed.
import sys
import logging
import doctest
from Acquisition import Implicit
from ZPublisher import BeforeTraverse
from ZPublisher.BaseRequest import BaseRequest
from ZPublisher.HTTPResponse import HTTPResponse
def makeBaseRequest(root):
response = HTTPResponse()
environment = { 'URL': '',
'PARENTS': [root],
'steps': [],
'_hacked_path': 0,
'_test_counter': 0,
'response': response }
environment = {
'URL': '',
'PARENTS': [root],
'steps': [],
'_hacked_path': 0,
'_test_counter': 0,
'response': response,
}
return BaseRequest(environment)
......@@ -23,19 +24,24 @@ class DummyObjectBasic(Implicit):
pass
class BrokenHook:
class BrokenHook(object):
def __call__(self, *args):
print self.__class__.__name__, 'called'
raise TypeError, self.__class__.__name__
print('%s called' % self.__class__.__name__)
raise TypeError(self.__class__.__name__)
def testBeforeTraverse(self):
"""
"""
Zope supports a 'before traverse' hook that is used for several
features, including 'Site Access Rules'. It is implemented using a
special API for registering hooks, and the hooks themselves are
called during traversal by ZPublisher.
>>> import sys
>>> import logging
>>> from ZPublisher import BeforeTraverse
>>> root = DummyObjectBasic()
>>> request = makeBaseRequest(root)
......@@ -58,7 +64,7 @@ def testBeforeTraverse(self):
{(99, 'broken_hook'): <ZPublisher.tests.testBeforeTraverse.BrokenHook ...>}
Setup logging so we can see the actual exception being logged:
>>> logger = logging.getLogger('ZPublisher')
>>> level = logger.level
>>> handlers = logger.handlers[:]
......@@ -67,7 +73,7 @@ def testBeforeTraverse(self):
>>> logger.setLevel(logging.ERROR)
Now do the actual traversal:
>>> _ = request.traverse('container/obj')
BrokenHook called
'__before_publishing_traverse__' call ... failed.
......@@ -93,9 +99,9 @@ def testBeforeTraverse(self):
during traversal you can register a 'NameCaller' as the hook
instead, and it will delegate to the callable by looking it up as
an attribute of the container:
>>> container.broken_callable = BrokenHook()
>>> BeforeTraverse.registerBeforeTraverse(container,
>>> BeforeTraverse.registerBeforeTraverse(container,
... BeforeTraverse.NameCaller('broken_callable'),
... 'broken_callable')
......@@ -103,7 +109,7 @@ def testBeforeTraverse(self):
{(99, 'broken_callable'): <ZPublisher.BeforeTraverse.NameCaller ...>}
Now do the actual traversal:
>>> _ = request.traverse('container/obj')
BrokenHook called
BeforeTraverse: Error while invoking hook: "broken_callable"
......@@ -131,10 +137,7 @@ def testBeforeTraverse(self):
>>> logger.handlers = handlers[:]
"""
pass
import doctest
def test_suite():
return doctest.DocTestSuite(optionflags=doctest.ELLIPSIS)
......@@ -16,17 +16,19 @@ from ZPublisher.HTTPRangeSupport import parseRange, expandRanges
import unittest
class TestRangeHeaderParse(unittest.TestCase):
# Utility methods
def expectNone(self, header):
result = parseRange(header)
self.assertTrue(result is None, 'Expected None, got %s' % `result`)
self.assertTrue(result is None, 'Expected None, got %r' % result)
def expectSets(self, header, sets):
result = parseRange(header)
self.assertTrue(result == sets,
'Expected %s, got %s' % (`sets`, `result`))
self.assertTrue(
result == sets,
'Expected %r, got %r' % (sets, result))
# Syntactically incorrect headers
def testGarbage(self):
......@@ -67,7 +69,8 @@ class TestRangeHeaderParse(unittest.TestCase):
self.expectSets('bytes=100-100', [(100, 101)])
def testMultiple(self):
self.expectSets('bytes=-100,,1-2,20-',
self.expectSets(
'bytes=-100,,1-2,20-',
[(-100, None), (1, 3), (20, None)])
def testFirstByte(self):
......@@ -81,8 +84,9 @@ class TestExpandRanges(unittest.TestCase):
def expectSets(self, sets, size, expect):
result = expandRanges(sets, size)
self.assertTrue(result == expect,
'Expected %s, got %s' % (`expect`, `result`))
self.assertTrue(
result == expect,
'Expected %r, got %r' % (expect, result))
def testExpandOpenEnd(self):
self.expectSets([(1, 2), (5, None)], 50, [(1, 2), (5, 50)])
......@@ -91,23 +95,28 @@ class TestExpandRanges(unittest.TestCase):
self.expectSets([(1, 2), (-5, None)], 50, [(1, 2), (45, 50)])
def testNoOverlapInOrder(self):
self.expectSets([(1, 5), (1000, 2000), (3000, None)], 5000,
self.expectSets(
[(1, 5), (1000, 2000), (3000, None)], 5000,
[(1, 5), (1000, 2000), (3000, 5000)])
def testNoOverlapOutOfOrder(self):
self.expectSets([(1000, 2000), (3000, None), (1, 5)], 5000,
self.expectSets(
[(1000, 2000), (3000, None), (1, 5)], 5000,
[(1000, 2000), (3000, 5000), (1, 5)])
def testOverlapInOrder(self):
self.expectSets([(1, 10), (8, 20), (25, None)], 5000,
self.expectSets(
[(1, 10), (8, 20), (25, None)], 5000,
[(1, 10), (8, 20), (25, 5000)])
def testOverlapOutOfOrder(self):
self.expectSets([(25, 50), (8, None), (1, 10)], 5000,
self.expectSets(
[(25, 50), (8, None), (1, 10)], 5000,
[(25, 50), (8, 5000), (1, 10)])
def testAdjacentInOrder(self):
self.expectSets([(1, 10), (10, 20), (25, 50)], 5000,
self.expectSets(
[(1, 10), (10, 20), (25, 50)], 5000,
[(1, 10), (10, 20), (25, 50)])
def testAdjacentOutOfOrder(self):
......@@ -119,20 +128,3 @@ class TestExpandRanges(unittest.TestCase):
def testRemoveUnsatisfiable(self):
self.expectSets([(sys.maxint, None), (10, 20)], 50, [(10, 20)])
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestRangeHeaderParse, 'test'))
suite.addTest(unittest.makeSuite(TestExpandRanges, 'test'))
return suite
def main():
unittest.TextTestRunner().run(test_suite())
def debug():
test_suite().debug()
def pdebug():
import pdb
pdb.run('debug()')
This diff is collapsed.
This diff is collapsed.
......@@ -2,11 +2,8 @@ import unittest
from zope.interface.verify import verifyClass
from ZPublisher.Iterators import IStreamIterator, filestream_iterator
class TestFileStreamIterator(unittest.TestCase):
def testInterface(self):
verifyClass(IStreamIterator, filestream_iterator)
def test_suite():
suite = unittest.TestSuite()
suite.addTest( unittest.makeSuite( TestFileStreamIterator ) )
return suite
from unittest import TestCase, TestSuite, makeSuite, main
import Testing
import Zope2
Zope2.startup()
from unittest import TestCase
from Acquisition import Implicit
from ZPublisher.BaseRequest import BaseRequest
from ZPublisher.HTTPResponse import HTTPResponse
import Zope2
# Various post traversal methods
Zope2.startup()
pt_simple_was_run = 0
def pt_simple():
global pt_simple_was_run
global pt_simple_was_run
pt_simple_was_run = 1
pass
def pt_static_arg(request, b):
request.set('b', b)
pass
def pt_simple_redirect(a):
return a
def pt_chain_test(request, string):
request.set('a', request.get('a', '') + string)
class DummyObjectBasic(Implicit):
""" Dummy class with docstring.
"""
......@@ -41,6 +42,7 @@ class DummyObjectBasic(Implicit):
"""
return 'view content'
class DummyObjectWithPTHook(DummyObjectBasic):
""" Dummy class with docstring.
"""
......@@ -51,32 +53,35 @@ class DummyObjectWithPTHook(DummyObjectBasic):
for x in self.traversal:
REQUEST.post_traverse(*x)
class TestBaseRequestPT(TestCase):
def setUp(self):
self.root = DummyObjectBasic()
self.f1 = self.root._setObject('folder', DummyObjectBasic() )
self.f1._setObject('objBasic', DummyObjectWithPTHook() )
self.f1 = self.root._setObject('folder', DummyObjectBasic())
self.f1._setObject('objBasic', DummyObjectWithPTHook())
def makeBaseRequest(self):
response = HTTPResponse()
environment = { 'URL': '',
'PARENTS': [self.root],
'steps': [],
'_hacked_path': 0,
'_test_counter': 0,
'response': response }
environment = {
'URL': '',
'PARENTS': [self.root],
'steps': [],
'_hacked_path': 0,
'_test_counter': 0,
'response': response,
}
return BaseRequest(environment)
def test_post_basic(self):
global pt_simple_was_run
pt_simple_was_run = 0
r = self.makeBaseRequest()
# Set hook
self.f1.objBasic.traversal = [(pt_simple,)]
x = r.traverse('folder/objBasic')
# Object should be self.f1.objBasic
self.assertEqual(x, self.f1.objBasic)
self.assertEqual(pt_simple_was_run, 1)
......@@ -101,13 +106,15 @@ class TestBaseRequestPT(TestCase):
r = self.makeBaseRequest()
self.f1.objBasic.traversal = [ (pt_chain_test, (r, 'a')),
(pt_chain_test, (r, 'b')),
(pt_chain_test, (r, 'c')),
(pt_chain_test, (r, 'd'))]
self.f1.objBasic.traversal = [
(pt_chain_test, (r, 'a')),
(pt_chain_test, (r, 'b')),
(pt_chain_test, (r, 'c')),
(pt_chain_test, (r, 'd')),
]
x = r.traverse('folder/objBasic')
self.assertEqual(r.get('a',''), 'abcd')
r.traverse('folder/objBasic')
self.assertEqual(r.get('a', ''), 'abcd')
self.f1.objBasic.traversal = []
......@@ -119,21 +126,20 @@ class TestBaseRequestPT(TestCase):
x = r.traverse('folder/objBasic')
self.assertEqual(x, check)
def test_hook_chain_redirect(self):
r = self.makeBaseRequest()
check = []
self.f1.objBasic.traversal = [ (pt_chain_test, (r, 'a')),
(pt_chain_test, (r, 'b')),
(pt_chain_test, (r, 'c')),
(pt_simple_redirect, (check,)),
(pt_simple_redirect, (1,)),
(pt_chain_test, (r, 'd'))]
self.f1.objBasic.traversal = [
(pt_chain_test, (r, 'a')),
(pt_chain_test, (r, 'b')),
(pt_chain_test, (r, 'c')),
(pt_simple_redirect, (check,)),
(pt_simple_redirect, (1,)),
(pt_chain_test, (r, 'd')),
]
x = r.traverse('folder/objBasic')
self.assertEqual(r.get('a',''), 'abc')
self.assertEqual(r.get('a', ''), 'abc')
self.assertEqual(x, check)
def test_suite():
return TestSuite( ( makeSuite(TestBaseRequestPT), ) )
This diff is collapsed.
......@@ -13,6 +13,7 @@
import unittest
class ConvertersTests(unittest.TestCase):
def test_field2string_with_string(self):
......@@ -29,21 +30,12 @@ class ConvertersTests(unittest.TestCase):
def test_field2string_with_filelike_object(self):
from ZPublisher.Converters import field2string
to_convert = 'to_convert'
class Filelike:
def read(self):
return to_convert
self.assertEqual(field2string(Filelike()), to_convert)
#TODO def test_field2text....
#TODO def test_field2required....
#TODO def test_field2int....
#TODO def test_field2float....
#TODO def test_field2tokens....
def test_field2lines_with_list(self):
from ZPublisher.Converters import field2lines
to_convert = ['one', 'two']
......@@ -68,19 +60,6 @@ class ConvertersTests(unittest.TestCase):
from ZPublisher.Converters import field2lines
to_convert = 'abc\ndef\nghi'
self.assertEqual(field2lines(to_convert), to_convert.splitlines())
#TODO def test_field2date....
#TODO def test_field2date_international....
#TODO def test_field2boolean....
#TODO def test_field2ustring....
#TODO def test_field2utokens....
#TODO def test_field2utext....
def test_field2ulines_with_list(self):
from ZPublisher.Converters import field2ulines
......@@ -108,7 +87,3 @@ class ConvertersTests(unittest.TestCase):
from ZPublisher.Converters import field2ulines
to_convert = u'abc\ndef\nghi'
self.assertEqual(field2ulines(to_convert), to_convert.splitlines())
def test_suite():
return unittest.TestSuite((unittest.makeSuite(ConvertersTests),))
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment