Commit 5a54bdff authored by Hanno Schlichting's avatar Hanno Schlichting

Move ZServer related testing support into ZServer.Testing.

parent a496dc50
...@@ -28,6 +28,8 @@ Features Added ...@@ -28,6 +28,8 @@ Features Added
Restructuring Restructuring
+++++++++++++ +++++++++++++
- Move ZServer related testing support into ZServer.Testing.
- Split out Lifetime, webdav and ZServer packages into a ZServer project. - Split out Lifetime, webdav and ZServer packages into a ZServer project.
- Move webdav's EtagSupport, Lockable and LockItem into OFS. - Move webdav's EtagSupport, Lockable and LockItem into OFS.
......
...@@ -343,11 +343,11 @@ class ImageTests(FileTests): ...@@ -343,11 +343,11 @@ class ImageTests(FileTests):
def testStr(self): def testStr(self):
self.assertEqual( self.assertEqual(
str(self.file), str(self.file),
('<img src="http://foo/file" ' ('<img src="http://nohost/file" '
'alt="" title="" height="16" width="16" />')) 'alt="" title="" height="16" width="16" />'))
def testTag(self): def testTag(self):
tag_fmt = ('<img src="http://foo/file" ' tag_fmt = ('<img src="http://nohost/file" '
'alt="%s" title="%s" height="16" width="16" />') 'alt="%s" title="%s" height="16" width="16" />')
self.assertEqual(self.file.tag(), (tag_fmt % ('', ''))) self.assertEqual(self.file.tag(), (tag_fmt % ('', '')))
self.file.manage_changeProperties(title='foo') self.file.manage_changeProperties(title='foo')
......
...@@ -34,7 +34,7 @@ class VHMRegressions(unittest.TestCase): ...@@ -34,7 +34,7 @@ class VHMRegressions(unittest.TestCase):
def testAbsoluteUrl(self): def testAbsoluteUrl(self):
m = self.app.folder.doc.absolute_url m = self.app.folder.doc.absolute_url
self.assertEqual(m(), 'http://foo/folder/doc') self.assertEqual(m(), 'http://nohost/folder/doc')
def testAbsoluteUrlPath(self): def testAbsoluteUrlPath(self):
m = self.app.folder.doc.absolute_url_path m = self.app.folder.doc.absolute_url_path
...@@ -101,7 +101,7 @@ class VHMRegressions(unittest.TestCase): ...@@ -101,7 +101,7 @@ class VHMRegressions(unittest.TestCase):
def gen_cases(): def gen_cases():
for vbase, ubase in ( for vbase, ubase in (
('', 'http://foo'), ('', 'http://nohost'),
('/VirtualHostBase/http/example.com:80', 'http://example.com')): ('/VirtualHostBase/http/example.com:80', 'http://example.com')):
yield vbase, '', '', 'folder/doc', ubase yield vbase, '', '', 'folder/doc', ubase
......
...@@ -13,22 +13,23 @@ ...@@ -13,22 +13,23 @@
"""TestCase for Zope testing """TestCase for Zope testing
""" """
import ZopeLite as Zope2
import unittest import unittest
import transaction import transaction
import utils
import interfaces
import connections
import layer
from zope.interface import implements from zope.interface import implements
from AccessControl.SecurityManagement import noSecurityManager from AccessControl.SecurityManagement import noSecurityManager
from Testing.makerequest import makerequest
from Testing.ZopeTestCase import connections
from Testing.ZopeTestCase import interfaces
from Testing.ZopeTestCase import layer
from Testing.ZopeTestCase import ZopeLite as Zope2
def app(): def app():
'''Opens a ZODB connection and returns the app object.''' '''Opens a ZODB connection and returns the app object.'''
app = Zope2.app() app = Zope2.app()
app = utils.makerequest(app) app = makerequest(app)
connections.register(app) connections.register(app)
return app return app
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
"""ZopeLite layer """ZopeLite layer
""" """
from Testing.ZopeTestCase import utils
_deferred_setup = [] _deferred_setup = []
...@@ -71,9 +73,8 @@ def appcall(func): ...@@ -71,9 +73,8 @@ def appcall(func):
return func(*args, **kw) return func(*args, **kw)
if kw.get('app') is not None: if kw.get('app') is not None:
return func(*args, **kw) return func(*args, **kw)
def caller(*args, **kw): def caller(*args, **kw):
import utils
utils.appcall(func, *args, **kw) utils.appcall(func, *args, **kw)
_deferred_setup.append((caller, args, kw)) _deferred_setup.append((caller, args, kw))
return appcalled_func return appcalled_func
...@@ -13,11 +13,11 @@ ...@@ -13,11 +13,11 @@
"""Support for ZODB sandboxes in ZTC """Support for ZODB sandboxes in ZTC
""" """
import ZopeLite as Zope2
import transaction import transaction
import base
import utils from Testing.makerequest import makerequest
import connections from Testing.ZopeTestCase import connections
from Testing.ZopeTestCase import ZopeLite as Zope2
class Sandboxed: class Sandboxed:
...@@ -32,7 +32,7 @@ class Sandboxed: ...@@ -32,7 +32,7 @@ class Sandboxed:
'''Returns the app object for a test.''' '''Returns the app object for a test.'''
app = Zope2.app(Zope2.sandbox().open()) app = Zope2.app(Zope2.sandbox().open())
AppZapper().set(app) AppZapper().set(app)
app = utils.makerequest(app) app = makerequest(app)
connections.register(app) connections.register(app)
return app return app
......
##############################################################################
#
# Copyright (c) 2005 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Example ZopeTestCase testing web access to a freshly started ZServer
Note that we need to set up the error_log before starting the ZServer.
Note further that the test thread needs to explicitly commit its
transactions, so the ZServer threads can see modifications made to
the ZODB.
IF YOU THINK YOU NEED THE WEBSERVER STARTED YOU ARE PROBABLY WRONG!
This is only required in very special cases, like when testing
ZopeXMLMethods where XSLT processing is done by external tools that
need to URL-call back into the Zope server.
If you want to write functional unit tests, see the testFunctional.py
example instead.
"""
from Testing import ZopeTestCase
from Testing.ZopeTestCase import transaction
from AccessControl import Unauthorized
import urllib
# Create the error_log object
ZopeTestCase.utils.setupSiteErrorLog()
# Start the web server
ZopeTestCase.utils.startZServer()
class ManagementOpener(urllib.FancyURLopener):
'''Logs on as manager when prompted'''
def prompt_user_passwd(self, host, realm):
return ('manager', 'secret')
class UnauthorizedOpener(urllib.FancyURLopener):
'''Raises Unauthorized when prompted'''
def prompt_user_passwd(self, host, realm):
raise Unauthorized, 'The URLopener was asked for authentication'
class TestWebserver(ZopeTestCase.ZopeTestCase):
def afterSetUp(self):
uf = self.folder.acl_users
uf.userFolderAddUser('manager', 'secret', ['Manager'], [])
self.folder_url = self.folder.absolute_url()
# A simple document
self.folder.addDTMLDocument('index_html', file='index_html called')
# A document only accessible to manager
self.folder.addDTMLDocument('secret_html', file='secret_html called')
for p in ZopeTestCase.standard_permissions:
self.folder.secret_html.manage_permission(p, ['Manager'])
# A method to change the title property of an object
self.folder.addDTMLMethod('change_title',
file='''<dtml-call "manage_changeProperties(title=REQUEST.get('title'))">'''
'''<dtml-var title_or_id>''')
manager = uf.getUserById('manager').__of__(uf)
self.folder.change_title.changeOwnership(manager)
# Commit so the ZServer threads can see the changes
transaction.commit()
def beforeClose(self):
# Commit after cleanup
transaction.commit()
def testAccessPublicObject(self):
# Test access to a public resource
page = self.folder.index_html(self.folder)
self.assertEqual(page, 'index_html called')
def testURLAccessPublicObject(self):
# Test web access to a public resource
urllib._urlopener = ManagementOpener()
page = urllib.urlopen(self.folder_url+'/index_html').read()
self.assertEqual(page, 'index_html called')
def testAccessProtectedObject(self):
# Test access to a protected resource
page = self.folder.secret_html(self.folder)
self.assertEqual(page, 'secret_html called')
def testURLAccessProtectedObject(self):
# Test web access to a protected resource
urllib._urlopener = ManagementOpener()
page = urllib.urlopen(self.folder_url+'/secret_html').read()
self.assertEqual(page, 'secret_html called')
def testSecurityOfPublicObject(self):
# Test security of a public resource
try:
self.folder.restrictedTraverse('index_html')
except Unauthorized:
# Convert error to failure
self.fail('Unauthorized')
def testURLSecurityOfPublicObject(self):
# Test web security of a public resource
urllib._urlopener = UnauthorizedOpener()
try:
urllib.urlopen(self.folder_url+'/index_html')
except Unauthorized:
# Convert error to failure
self.fail('Unauthorized')
def testSecurityOfProtectedObject(self):
# Test security of a protected resource
try:
self.folder.restrictedTraverse('secret_html')
except Unauthorized:
pass # Test passed
else:
self.fail('Resource not protected')
def testURLSecurityOfProtectedObject(self):
# Test web security of a protected resource
urllib._urlopener = UnauthorizedOpener()
try:
urllib.urlopen(self.folder_url+'/secret_html')
except Unauthorized:
pass # Test passed
else:
self.fail('Resource not protected')
def testModifyObject(self):
# Test a script that modifies the ZODB
self.setRoles(['Manager'])
self.app.REQUEST.set('title', 'Foo')
page = self.folder.index_html.change_title(self.folder.index_html,
self.app.REQUEST)
self.assertEqual(page, 'Foo')
self.assertEqual(self.folder.index_html.title, 'Foo')
def testURLModifyObject(self):
# Test a transaction that actually commits something
urllib._urlopener = ManagementOpener()
page = urllib.urlopen(self.folder_url+'/index_html/change_title?title=Foo').read()
self.assertEqual(page, 'Foo')
class TestSandboxedWebserver(ZopeTestCase.Sandboxed, TestWebserver):
'''Demonstrates that tests involving ZServer threads can also be
run from sandboxes. In fact, it may be preferable to do so.
'''
# Note: By inheriting from TestWebserver we run the same
# test methods as above!
def testConnectionIsShared(self):
# Due to sandboxing the ZServer thread operates on the
# same connection as the main thread, allowing us to
# see changes made to 'index_html' right away.
urllib._urlopener = ManagementOpener()
urllib.urlopen(self.folder_url+'/index_html/change_title?title=Foo')
self.assertEqual(self.folder.index_html.title, 'Foo')
def testCanCommit(self):
# Additionally, it allows us to commit transactions without
# harming the test ZODB.
self.folder.foo = 1
transaction.commit()
self.folder.foo = 2
transaction.commit()
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestWebserver))
suite.addTest(makeSuite(TestSandboxedWebserver))
return suite
...@@ -10,56 +10,15 @@ ...@@ -10,56 +10,15 @@
# FOR A PARTICULAR PURPOSE. # FOR A PARTICULAR PURPOSE.
# #
############################################################################## ##############################################################################
"""Parts of ZServer support are in this module so they can
be imported more selectively.
"""
from threading import Thread
from StringIO import StringIO
dummyLOG = StringIO()
def setNumberOfThreads(number_of_threads):
'''Sets number of ZServer threads.'''
try:
from ZServer.Zope2.Startup.config import setNumberOfThreads
setNumberOfThreads(number_of_threads)
except ImportError:
pass
def zserverRunner(host, port, log=None):
'''Runs an HTTP ZServer on host:port.'''
from ZServer import logger, asyncore
from ZServer import zhttp_server, zhttp_handler
if log is None: log = dummyLOG
lg = logger.file_logger(log)
hs = zhttp_server(ip=host, port=port, resolver=None, logger_object=lg)
zh = zhttp_handler(module='Zope2', uri_base='')
hs.install_handler(zh)
asyncore.loop()
class QuietThread(Thread):
'''This thread eats all exceptions'''
def __init__(self, target=None, args=(), kwargs={}):
Thread.__init__(self, target=target, args=args, kwargs=kwargs)
self.__old_bootstrap = Thread._Thread__bootstrap
def __bootstrap(self):
try: self.__old_bootstrap(self)
except: pass
_Thread__bootstrap = __bootstrap
def QuietPublisher(self, accept):
'''This server eats all exceptions'''
try: self.__old_init__(accept)
except: pass
from ZServer.PubCore.ZServerPublisher import ZServerPublisher
if not hasattr(ZServerPublisher, '__old_init__'):
ZServerPublisher.__old_init__ = ZServerPublisher.__init__
ZServerPublisher.__init__ = QuietPublisher
from zope.deferredimport import deprecated
# BBB Zope 5.0
deprecated(
'Please import from ZServer.Testing.threadutils.',
dummyLOG='ZServer.Testing.threadutils:dummyLOG',
setNumberOfThreads='ZServer.Testing.threadutils:setNumberOfThreads',
zserverRunner='ZServer.Testing.threadutils:zserverRunner',
QuietThread='ZServer.Testing.threadutils:QuietThread',
QuietPublisher='ZServer.Testing.threadutils:QuietPublisher',
)
...@@ -10,122 +10,23 @@ ...@@ -10,122 +10,23 @@
# FOR A PARTICULAR PURPOSE. # FOR A PARTICULAR PURPOSE.
# #
############################################################################## ##############################################################################
"""Utility functions
These functions are designed to be imported and run at
module level to add functionality to the test environment.
"""
import os
import sys
import time
import random
import transaction import transaction
import layer from zope.deferredimport import deprecated
@layer.appcall
def setupCoreSessions(app):
'''Sets up the session_data_manager e.a.'''
from Acquisition import aq_base
commit = 0
try:
from Products.TemporaryFolder.TemporaryFolder import \
MountedTemporaryFolder
from Products.Transience.Transience import TransientObjectContainer
from Products.Sessions.BrowserIdManager import BrowserIdManager
from Products.Sessions.SessionDataManager import SessionDataManager
except ImportError:
pass
else:
if not hasattr(app, 'temp_folder'):
tf = MountedTemporaryFolder('temp_folder', 'Temporary Folder')
app._setObject('temp_folder', tf)
commit = 1
if not hasattr(aq_base(app.temp_folder), 'session_data'):
toc = TransientObjectContainer(
'session_data',
'Session Data Container',
timeout_mins=3,
limit=100)
app.temp_folder._setObject('session_data', toc)
commit = 1
if not hasattr(app, 'browser_id_manager'):
bid = BrowserIdManager('browser_id_manager',
'Browser Id Manager')
app._setObject('browser_id_manager', bid)
commit = 1
if not hasattr(app, 'session_data_manager'):
sdm = SessionDataManager(
'session_data_manager',
title='Session Data Manager',
path='/temp_folder/session_data',
requestName='SESSION')
app._setObject('session_data_manager', sdm)
commit = 1
if commit:
transaction.commit()
@layer.appcall # BBB Zope 5.0
def setupSiteErrorLog(app): deprecated(
'''Sets up the error_log object required by ZPublisher.''' 'Please import from ZServer.Testing.utils.',
if not hasattr(app, 'error_log'): importObjectFromFile='ZServer.Testing.utils:importObjectFromFile',
try: setupCoreSessions='ZServer.Testing.utils:setupCoreSessions',
from Products.SiteErrorLog.SiteErrorLog import SiteErrorLog setupSiteErrorLog='ZServer.Testing.utils:setupSiteErrorLog',
except ImportError: startZServer='ZServer.Testing.utils:startZServer',
pass )
else:
app._setObject('error_log', SiteErrorLog())
transaction.commit()
deprecated(
def importObjectFromFile(container, filename, quiet=0): 'Please import from Testing.makerequest.',
'''Imports an object from a (.zexp) file into the given container.''' makerequest='Testing.makerequest:makerequest',
from ZopeLite import _print, _patched )
quiet = quiet or not _patched
start = time.time()
if not quiet:
_print("Importing %s ... " % os.path.basename(filename))
container._importObjectFromFile(filename, verify=0)
transaction.commit()
if not quiet:
_print('done (%.3fs)\n' % (time.time() - start))
_Z2HOST = None
_Z2PORT = None
def startZServer(number_of_threads=1, log=None):
'''Starts an HTTP ZServer thread.'''
global _Z2HOST, _Z2PORT
if _Z2HOST is None:
_Z2HOST = '127.0.0.1'
_Z2PORT = random.choice(range(55000, 55500))
from threadutils import setNumberOfThreads
setNumberOfThreads(number_of_threads)
from threadutils import QuietThread, zserverRunner
t = QuietThread(target=zserverRunner, args=(_Z2HOST, _Z2PORT, log))
t.setDaemon(1)
t.start()
time.sleep(0.1) # Sandor Palfy
return _Z2HOST, _Z2PORT
def makerequest(app, stdout=sys.stdout):
'''Wraps the app into a fresh REQUEST.'''
from Testing.makerequest import makerequest as _makerequest
environ = {}
environ['SERVER_NAME'] = _Z2HOST or 'nohost'
environ['SERVER_PORT'] = '%d' % (_Z2PORT or 80)
environ['REQUEST_METHOD'] = 'GET'
return _makerequest(app, stdout=stdout, environ=environ)
def appcall(func, *args, **kw): def appcall(func, *args, **kw):
...@@ -151,14 +52,3 @@ def makelist(arg): ...@@ -151,14 +52,3 @@ def makelist(arg):
if isinstance(arg, str): if isinstance(arg, str):
return filter(None, [arg]) return filter(None, [arg])
raise ValueError('Argument must be list, tuple, or string') raise ValueError('Argument must be list, tuple, or string')
__all__ = [
'setupCoreSessions',
'setupSiteErrorLog',
'startZServer',
'importObjectFromFile',
'appcall',
'makerequest',
'makelist',
]
...@@ -50,7 +50,7 @@ def makerequest(app, stdout=stdout, environ=None): ...@@ -50,7 +50,7 @@ def makerequest(app, stdout=stdout, environ=None):
if environ is None: if environ is None:
environ = {} environ = {}
resp = HTTPResponse(stdout=stdout) resp = HTTPResponse(stdout=stdout)
environ.setdefault('SERVER_NAME', 'foo') environ.setdefault('SERVER_NAME', 'nohost')
environ.setdefault('SERVER_PORT', '80') environ.setdefault('SERVER_PORT', '80')
environ.setdefault('REQUEST_METHOD', 'GET') environ.setdefault('REQUEST_METHOD', 'GET')
req = HTTPRequest(stdin, environ, resp) req = HTTPRequest(stdin, environ, resp)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment