Commit 968fe738 authored by Stefan H. Holek's avatar Stefan H. Holek

Made the ConnectionRegistry capable of storing OFS.Application objects

and pushed reponsibility for closing the REQUEST down to the registry.
parent b4216fe9
......@@ -24,7 +24,7 @@ The twist is that the portal object itself is *not* created
by the PortalTestCase class! Subclasses must make sure
getPortal() returns a usable portal object to the setup code.
$Id: PortalTestCase.py,v 1.38 2005/02/09 12:42:40 shh42 Exp $
$Id$
"""
import base
......@@ -111,7 +111,7 @@ class PortalTestCase(base.TestCase):
Note: This method should not be called by tests!
'''
return self.app[portal_name]
return getattr(self.app, portal_name)
def createMemberarea(self, name):
'''Creates a memberarea for the specified user.
......
......@@ -64,7 +64,7 @@ class ZopeTestCase(base.TestCase):
def _setupFolder(self):
'''Creates and configures the folder.'''
self.app.manage_addFolder(folder_name)
self.folder = self.app._getOb(folder_name)
self.folder = getattr(self.app, folder_name)
self.folder._addRole(user_role)
self.folder.manage_role(user_role, standard_permissions)
......@@ -81,7 +81,7 @@ class ZopeTestCase(base.TestCase):
'''Clears the fixture.'''
# This code is a wart from the olden days.
try:
if connections.contains(self.app._p_jar):
if connections.contains(self.app):
self.app._delObject(folder_name)
except:
pass
......
......@@ -16,7 +16,6 @@ $Id$
"""
import ZopeLite as Zope2
import unittest
import transaction
import profiler
......@@ -27,16 +26,17 @@ import connections
from AccessControl.SecurityManagement import noSecurityManager
def app():
'''Opens a ZODB connection and returns the app object.'''
app = Zope2.app()
connections.register(app._p_jar)
return utils.makerequest(app)
app = utils.makerequest(app)
connections.register(app)
return app
def close(app):
'''Closes the app's ZODB connection.'''
app.REQUEST.close()
connections.close(app._p_jar)
connections.close(app)
......@@ -119,8 +119,6 @@ class TestCase(profiler.Profiled, unittest.TestCase):
'''Clears the fixture.'''
if call_close_hook:
self.beforeClose()
if connections.contains(self.app._p_jar):
self.app.REQUEST.close()
self._close()
self.logout()
self.afterClear()
......
......@@ -16,7 +16,12 @@ $Id$
"""
class ConnectionRegistry:
'''ZODB connection registry'''
'''ZODB connection registry
This registry can hold either ZODB.Connection objects or OFS.Application
objects. In the latter case, a close operation will close the REQUEST as
well as the Connection referenced by the Application's _p_jar attribute.
'''
def __init__(self):
self._conns = []
......@@ -36,13 +41,20 @@ class ConnectionRegistry:
def close(self, conn):
if self.contains(conn):
self._conns.remove(conn)
conn.close()
self._do_close(conn)
def closeAll(self):
for conn in self._conns:
conn.close()
self._do_close(conn)
self._conns = []
def _do_close(self, conn):
if hasattr(conn, 'close'):
conn.close()
else:
conn.REQUEST.close()
conn._p_jar.close()
registry = ConnectionRegistry()
register = registry.register
......
......@@ -33,14 +33,15 @@ class Sandboxed:
def _app(self):
'''Returns the app object for a test.'''
app = Zope2.app(Zope2.sandbox().open())
connections.register(app._p_jar)
AppZapper().set(app)
return utils.makerequest(app)
app = utils.makerequest(app)
connections.register(app)
return app
def _close(self):
'''Clears the transaction and the AppZapper.'''
transaction.abort()
AppZapper().clear()
transaction.abort()
connections.closeAll()
......
......@@ -31,7 +31,9 @@ import transaction
from Testing.ZopeTestCase import base
from Testing.ZopeTestCase import utils
from Testing.ZopeTestCase import connections
from Testing.ZopeTestCase import sandbox
from Acquisition import aq_base
from AccessControl import getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager
......@@ -121,6 +123,36 @@ class TestTestCase(HookTest):
self._clear()
self.assertEqual(getSecurityManager().getUser().getUserName(), 'Anonymous User')
def testClearSurvivesDoubleCall(self):
self._called = []
self._clear()
self._clear()
self.assertHooks(['afterClear', 'afterClear'])
def testClearSurvivesClosedConnection(self):
self._called = []
self._close()
self._clear()
self.assertHooks(['afterClear'])
def testClearSurvivesBrokenApp(self):
self._called = []
self.app = None
self._clear()
self.assertHooks(['afterClear'])
def testClearSurvivesMissingApp(self):
self._called = []
delattr(self, 'app')
self._clear()
self.assertHooks(['afterClear'])
def testClearSurvivesMissingRequest(self):
self._called = []
self.app = aq_base(self.app)
self._clear()
self.assertHooks(['afterClear'])
def testCloseAbortsTransaction(self):
self.assertEqual(len(self.getObjectsInTransaction()), 0)
self.app.foo = 1
......@@ -160,7 +192,8 @@ class TestTestCase(HookTest):
class TestSetUpRaises(HookTest):
class Error: pass
class Error:
pass
def setUp(self):
try:
......@@ -180,7 +213,8 @@ class TestSetUpRaises(HookTest):
class TestTearDownRaises(HookTest):
class Error: pass
class Error:
pass
def tearDown(self):
self._called = []
......@@ -200,90 +234,109 @@ class TestTearDownRaises(HookTest):
class TestConnectionRegistry(base.TestCase):
'''Test the registry with Connection-like objects'''
class Conn:
closed = 0
_closed = 0
def close(self):
self.closed = 1
self._closed = 1
def closed(self):
return self._closed
Klass = Conn
def afterSetUp(self):
self.reg = connections.ConnectionRegistry()
self.conns = [self.Conn(), self.Conn(), self.Conn()]
self.conns = [self.Klass(), self.Klass(), self.Klass()]
for conn in self.conns:
self.reg.register(conn)
def testRegister(self):
# Should be able to register connections
for conn in self.conns:
self.reg.register(conn)
assert len(self.reg) == 3
assert self.reg.count() == 3
def testCloseConnection(self):
# Should be able to close a single registered connection
for conn in self.conns:
self.reg.register(conn)
assert len(self.reg) == 3
self.reg.close(self.conns[0])
assert len(self.reg) == 2
assert self.conns[0].closed == 1
assert self.conns[1].closed == 0
assert self.conns[2].closed == 0
assert self.conns[0].closed() == 1
assert self.conns[1].closed() == 0
assert self.conns[2].closed() == 0
def testCloseSeveralConnections(self):
# Should be able to close all registered connections one-by-one
for conn in self.conns:
self.reg.register(conn)
assert len(self.reg) == 3
self.reg.close(self.conns[0])
assert len(self.reg) == 2
assert self.conns[0].closed == 1
assert self.conns[1].closed == 0
assert self.conns[2].closed == 0
assert self.conns[0].closed() == 1
assert self.conns[1].closed() == 0
assert self.conns[2].closed() == 0
self.reg.close(self.conns[2])
assert len(self.reg) == 1
assert self.conns[0].closed == 1
assert self.conns[1].closed == 0
assert self.conns[2].closed == 1
assert self.conns[0].closed() == 1
assert self.conns[1].closed() == 0
assert self.conns[2].closed() == 1
self.reg.close(self.conns[1])
assert len(self.reg) == 0
assert self.conns[0].closed == 1
assert self.conns[1].closed == 1
assert self.conns[2].closed == 1
assert self.conns[0].closed() == 1
assert self.conns[1].closed() == 1
assert self.conns[2].closed() == 1
def testCloseForeignConnection(self):
# Should be able to close a connection that has not been registered
for conn in self.conns:
self.reg.register(conn)
assert len(self.reg) == 3
conn = self.Conn()
conn = self.Klass()
self.reg.close(conn)
assert len(self.reg) == 3
assert self.conns[0].closed == 0
assert self.conns[1].closed == 0
assert self.conns[2].closed == 0
assert conn.closed == 1
assert self.conns[0].closed() == 0
assert self.conns[1].closed() == 0
assert self.conns[2].closed() == 0
assert conn.closed() == 1
def testCloseAllConnections(self):
# Should be able to close all registered connections at once
for conn in self.conns:
self.reg.register(conn)
assert len(self.reg) == 3
self.reg.closeAll()
assert len(self.reg) == 0
assert self.conns[0].closed == 1
assert self.conns[1].closed == 1
assert self.conns[2].closed == 1
assert self.conns[0].closed() == 1
assert self.conns[1].closed() == 1
assert self.conns[2].closed() == 1
def testContains(self):
# Should be able to check if a connection is registered
for conn in self.conns:
self.reg.register(conn)
assert len(self.reg) == 3
assert self.reg.contains(self.conns[0])
assert self.reg.contains(self.conns[1])
assert self.reg.contains(self.conns[2])
class TestApplicationRegistry(TestConnectionRegistry):
'''Test the registry with Application-like objects'''
class App:
class Conn:
_closed = 0
def close(self):
self._closed = 1
def closed(self):
return self._closed
def __init__(self):
self.REQUEST = self.Conn()
self._p_jar = self.Conn()
def closed(self):
if self.REQUEST.closed() and self._p_jar.closed():
return 1
return 0
Klass = App
class TestListConverter(base.TestCase):
'''Test utils.makelist'''
def testList0(self):
self.assertEqual(utils.makelist([]), [])
......@@ -339,9 +392,12 @@ class TestRequestVariables(base.TestCase):
import gc
_sentinel1 = []
_sentinel2 = []
_sentinel3 = []
class TestRequestGarbage1(base.TestCase):
'''Make sure we do not leak REQUEST._held (and REQUEST.other)'''
'''Make sure base.app + base.close does not leak REQUEST._held'''
class Held:
def __del__(self):
......@@ -357,10 +413,8 @@ class TestRequestGarbage1(base.TestCase):
self.assertEqual(_sentinel1, ['__del__'])
_sentinel2 = []
class TestRequestGarbage2(base.TestCase):
'''Make sure we do not leak REQUEST._held (and REQUEST.other)'''
'''Make sure self._app + self._clear does not leak REQUEST._held'''
class Held:
def __del__(self):
......@@ -375,6 +429,22 @@ class TestRequestGarbage2(base.TestCase):
self.assertEqual(_sentinel2, ['__del__'])
class TestRequestGarbage3(sandbox.Sandboxed, base.TestCase):
'''Make sure self._app + self._clear does not leak REQUEST._held'''
class Held:
def __del__(self):
_sentinel3.append('__del__')
def afterSetUp(self):
self.app.REQUEST._hold(self.Held())
def testClearClosesRequest(self):
self._clear()
gc.collect()
self.assertEqual(_sentinel3, ['__del__'])
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
......@@ -382,10 +452,12 @@ def test_suite():
suite.addTest(makeSuite(TestSetUpRaises))
suite.addTest(makeSuite(TestTearDownRaises))
suite.addTest(makeSuite(TestConnectionRegistry))
suite.addTest(makeSuite(TestApplicationRegistry))
suite.addTest(makeSuite(TestListConverter))
suite.addTest(makeSuite(TestRequestVariables))
suite.addTest(makeSuite(TestRequestGarbage1))
suite.addTest(makeSuite(TestRequestGarbage2))
suite.addTest(makeSuite(TestRequestGarbage3))
return suite
if __name__ == '__main__':
......
......@@ -42,7 +42,7 @@ def hasattr_(ob, attr):
return hasattr(aq_base(ob), attr)
# Dummy Portal
# A dummy portal
from OFS.SimpleItem import SimpleItem
from OFS.Folder import Folder
......@@ -66,7 +66,7 @@ class DummyMembershipTool(SimpleItem):
portal.Members.manage_addFolder(member_id)
def getHomeFolder(self, member_id):
portal = self.aq_inner.aq_parent
return portal.Members[member_id]
return getattr(portal.Members, member_id)
class TestPortalTestCase(ZopeTestCase.PortalTestCase):
......@@ -78,7 +78,7 @@ class TestPortalTestCase(ZopeTestCase.PortalTestCase):
def getPortal(self):
# Must make sure we return a portal object
self.app._setObject(portal_name, DummyPortal(portal_name))
return self.app[portal_name]
return getattr(self.app, portal_name)
def setUp(self):
# For this test case we *want* to start
......@@ -391,7 +391,7 @@ class TestPlainUserFolder(ZopeTestCase.PortalTestCase):
def getPortal(self):
self.app._setObject(portal_name, DummyPortal(portal_name))
return self.app[portal_name]
return getattr(self.app, portal_name)
def testGetUserDoesNotWrapUser(self):
user = self.portal.acl_users.getUserById(user_name)
......@@ -412,7 +412,7 @@ class TestWrappingUserFolder(ZopeTestCase.PortalTestCase):
def getPortal(self):
self.app._setObject(portal_name, DummyPortal(portal_name))
return self.app[portal_name]
return getattr(self.app, portal_name)
def _setupUserFolder(self):
self.portal._setObject('acl_users', WrappingUserFolder())
......@@ -458,11 +458,12 @@ class HookTest(ZopeTestCase.PortalTestCase):
class TestSetUpRaises(HookTest):
class Error:
pass
def getPortal(self):
self.app._setObject(portal_name, DummyPortal(portal_name))
return self.app[portal_name]
class Error: pass
return getattr(self.app, portal_name)
def setUp(self):
try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment