From b2ad5daddf181c75ef8b7ce804688beadcc21133 Mon Sep 17 00:00:00 2001 From: Nicolas Delaby <nicolas@nexedi.com> Date: Thu, 14 May 2009 09:47:38 +0000 Subject: [PATCH] Cleanup test * harmonize indentation * import transaction * remove quiet and run_all_test variables * cosmetic changes git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@26973 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/ERP5Type/tests/testCacheTool.py | 360 +++++++++++------------- 1 file changed, 157 insertions(+), 203 deletions(-) diff --git a/product/ERP5Type/tests/testCacheTool.py b/product/ERP5Type/tests/testCacheTool.py index e473a8f8a4..861bda5631 100644 --- a/product/ERP5Type/tests/testCacheTool.py +++ b/product/ERP5Type/tests/testCacheTool.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- ############################################################################## # # Copyright (c) 2005 Nexedi SARL and Contributors. All Rights Reserved. @@ -34,18 +35,14 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.CachePlugins.DummyCache import DummyCache from AccessControl.SecurityManagement import newSecurityManager from zLOG import LOG - -try: - from transaction import get as get_transaction -except ImportError: - pass +import transaction class TestingCache(DummyCache): """A dummy cache that mark cache miss, so that you can later count access using getCacheMisses() """ def __init__(self, params): DummyCache.__init__(self, params) - + def __call__(self, callable_object, cache_id, scope, cache_duration=None, *args, **kwd): self.markCacheMiss(1) @@ -53,131 +50,102 @@ class TestingCache(DummyCache): class TestCacheTool(ERP5TypeTestCase): - run_all_test = 1 - def getTitle(self): return "Cache Tool" - + def afterSetUp(self): self.login() - - def login(self, quiet=0, run=run_all_test): + + def login(self): uf = self.getPortal().acl_users - uf._doAddUser('seb', '', ['Manager'], []) + uf._doAddUser('admin', '', ['Manager'], []) uf._doAddUser('ERP5TypeTestCase', '', ['Manager'], []) - user = uf.getUserById('seb').__of__(uf) + user = uf.getUserById('admin').__of__(uf) newSecurityManager(None, user) - def test_01_CheckCacheTool(self, quiet=0, run=run_all_test): - if not run: - return - if not quiet: - message = '\nCheck CacheTool ' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - - portal = self.getPortal() - self.assertNotEqual(None,getattr(portal,'portal_caches',None)) - get_transaction().commit() - - - def test_02_CheckPortalTypes(self, quiet=0, run=run_all_test): - if not run: - return - if not quiet: - message = '\nCheck Portal Types' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - - portal = self.getPortal() - portal_types = portal.portal_types - typeinfo_names = ("Cache Factory", + def test_01_CheckCacheTool(self): + portal = self.getPortal() + self.assertNotEqual(None,getattr(portal,'portal_caches',None)) + transaction.commit() + + def test_02_CheckPortalTypes(self): + portal = self.getPortal() + portal_types = portal.portal_types + typeinfo_names = ("Cache Factory", "Ram Cache", "Distributed Ram Cache", "SQL Cache", ) - for typeinfo_name in typeinfo_names: - portal_type = getattr(portal_types,typeinfo_name,None) - self.assertNotEqual(None,portal_type) - get_transaction().commit() - - def test_03_CreateCacheFactories(self, quiet=0, run=run_all_test): - if not run: - return - if not quiet: - message = '\nCreate Cache Tool Factories' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - portal = self.getPortal() - portal_caches = portal.portal_caches - - # Cache plugins are organised into 'Cache factories' so we create - # factories first ram_cache_factory (to test Ram Cache Plugin) - ram_cache_factory = portal_caches.newContent(portal_type="Cache Factory", - id = 'ram_cache_factory', - container=portal_caches) - ram_cache_plugin = ram_cache_factory.newContent(portal_type="Ram Cache", - container=ram_cache_factory) - ram_cache_plugin.setIntIndex(0) - - - ## distributed_ram_cache_factory (to test Distributed Ram Cache Plugin) - dram_cache_factory = portal_caches.newContent(portal_type="Cache Factory", - id = 'distributed_ram_cache_factory', - container=portal_caches) - dram_cache_plugin = dram_cache_factory.newContent( - portal_type="Distributed Ram Cache", container=dram_cache_factory) - dram_cache_plugin.setIntIndex(0) - - ## sql_cache_factory (to test SQL Cache Plugin) - sql_cache_factory = portal_caches.newContent(portal_type="Cache Factory", - id = 'sql_cache_factory', - container=portal_caches) - sql_cache_plugin = sql_cache_factory.newContent( - portal_type="SQL Cache", container=sql_cache_factory) - sql_cache_plugin.setIntIndex(0) - - ## erp5_user_factory (to test a combination of all cache plugins) - erp5_user_factory = portal_caches.newContent(portal_type="Cache Factory", - id = "erp5_user_factory", - container=portal_caches) - - ram_cache_plugin = erp5_user_factory.newContent( - portal_type="Ram Cache", container=erp5_user_factory) - ram_cache_plugin.setIntIndex(0) - dram_cache_plugin = erp5_user_factory.newContent( - portal_type="Distributed Ram Cache", container=erp5_user_factory) - dram_cache_plugin.setIntIndex(1) - sql_cache_plugin = erp5_user_factory.newContent( - portal_type="SQL Cache", container=erp5_user_factory) - sql_cache_plugin.setIntIndex(2) - - ## - get_transaction().commit() - - ## update Ram Cache structure - portal_caches.updateCache() - from Products.ERP5Type.Cache import CachingMethod - - ## do we have the same structure we created above? - self.assert_('ram_cache_factory' in CachingMethod.factories) - self.assert_('distributed_ram_cache_factory' in CachingMethod.factories) - self.assert_('sql_cache_factory' in CachingMethod.factories) - self.assert_('erp5_user_factory' in CachingMethod.factories) - - def test_04_CreateCachedMethod(self, quiet=0, run=run_all_test): - if not run: - return - if not quiet: - message = '\nCreate Cache Method (Python Script)' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - portal = self.getPortal() - - ## add test cached method - py_script_id = "testCachedMethod" - py_script_params = "value=10000, portal_path=('','erp5')" - py_script_body = """ + for typeinfo_name in typeinfo_names: + portal_type = getattr(portal_types,typeinfo_name,None) + self.assertNotEqual(None,portal_type) + transaction.commit() + + def test_03_CreateCacheFactories(self): + portal = self.getPortal() + portal_caches = portal.portal_caches + + # Cache plugins are organised into 'Cache factories' so we create + # factories first ram_cache_factory (to test Ram Cache Plugin) + ram_cache_factory = portal_caches.newContent(portal_type="Cache Factory", + id = 'ram_cache_factory', + container=portal_caches) + ram_cache_plugin = ram_cache_factory.newContent(portal_type="Ram Cache", + container=ram_cache_factory) + ram_cache_plugin.setIntIndex(0) + + + ## distributed_ram_cache_factory (to test Distributed Ram Cache Plugin) + dram_cache_factory = portal_caches.newContent(portal_type="Cache Factory", + id = 'distributed_ram_cache_factory', + container=portal_caches) + dram_cache_plugin = dram_cache_factory.newContent( + portal_type="Distributed Ram Cache", container=dram_cache_factory) + dram_cache_plugin.setIntIndex(0) + + ## sql_cache_factory (to test SQL Cache Plugin) + sql_cache_factory = portal_caches.newContent(portal_type="Cache Factory", + id = 'sql_cache_factory', + container=portal_caches) + sql_cache_plugin = sql_cache_factory.newContent( + portal_type="SQL Cache", container=sql_cache_factory) + sql_cache_plugin.setIntIndex(0) + + ## erp5_user_factory (to test a combination of all cache plugins) + erp5_user_factory = portal_caches.newContent(portal_type="Cache Factory", + id = "erp5_user_factory", + container=portal_caches) + + ram_cache_plugin = erp5_user_factory.newContent( + portal_type="Ram Cache", container=erp5_user_factory) + ram_cache_plugin.setIntIndex(0) + dram_cache_plugin = erp5_user_factory.newContent( + portal_type="Distributed Ram Cache", container=erp5_user_factory) + dram_cache_plugin.setIntIndex(1) + sql_cache_plugin = erp5_user_factory.newContent( + portal_type="SQL Cache", container=erp5_user_factory) + sql_cache_plugin.setIntIndex(2) + + ## + transaction.commit() + + ## update Ram Cache structure + portal_caches.updateCache() + from Products.ERP5Type.Cache import CachingMethod + + ## do we have the same structure we created above? + self.assert_('ram_cache_factory' in CachingMethod.factories) + self.assert_('distributed_ram_cache_factory' in CachingMethod.factories) + self.assert_('sql_cache_factory' in CachingMethod.factories) + self.assert_('erp5_user_factory' in CachingMethod.factories) + + def test_04_CreateCachedMethod(self): + portal = self.getPortal() + + ## add test cached method + py_script_id = "testCachedMethod" + py_script_params = "value=10000, portal_path=('','erp5')" + py_script_body = """ def veryExpensiveMethod(value): ## do something expensive for some time ## no 'time.sleep()' available in Zope @@ -190,96 +158,83 @@ def veryExpensiveMethod(value): result = veryExpensiveMethod(value) return result """ - portal.manage_addProduct['PythonScripts'].manage_addPythonScript( - id=py_script_id) - py_script_obj = getattr(portal, py_script_id) - py_script_obj.ZPythonScript_edit(py_script_params, py_script_body) - get_transaction().commit() + portal.manage_addProduct['PythonScripts'].manage_addPythonScript( + id=py_script_id) + py_script_obj = getattr(portal, py_script_id) + py_script_obj.ZPythonScript_edit(py_script_params, py_script_body) + transaction.commit() - def test_05_CacheFactoryOnePlugin(self, quiet=0, run=run_all_test): + def test_05_CacheFactoryOnePlugin(self): """ Test cache factory containing only one cache plugin. """ - if not run: - return - if not quiet: - message = '\nTest each type of cache plugin individually.' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - portal = self.getPortal() - from Products.ERP5Type.Cache import CachingMethod - py_script_id = "testCachedMethod" - py_script_obj = getattr(portal, py_script_id) - for cf_name in ('ram_cache_factory', - 'distributed_ram_cache_factory', - 'sql_cache_factory'): - my_cache = CachingMethod(py_script_obj, - 'py_script_obj', - cache_factory=cf_name) - self._cacheFactoryInstanceTest(my_cache, cf_name) - - def test_06_CacheFactoryMultiPlugins(self, quiet=0, run=run_all_test): - """ Test a cache factory containing multiple cache plugins. """ - if not run: - return - if not quiet: - message = '\nTest combination of available cache plugins under a cache'\ - ' factory' - ZopeTestCase._print(message) - LOG('Testing... ',0,message) - portal = self.getPortal() - from Products.ERP5Type.Cache import CachingMethod - py_script_id = "testCachedMethod" - py_script_obj = getattr(portal, py_script_id) - cf_name = 'erp5_user_factory' + portal = self.getPortal() + from Products.ERP5Type.Cache import CachingMethod + py_script_id = "testCachedMethod" + py_script_obj = getattr(portal, py_script_id) + for cf_name in ('ram_cache_factory', + 'distributed_ram_cache_factory', + 'sql_cache_factory'): my_cache = CachingMethod(py_script_obj, 'py_script_obj', cache_factory=cf_name) self._cacheFactoryInstanceTest(my_cache, cf_name) - + + def test_06_CacheFactoryMultiPlugins(self): + """ Test a cache factory containing multiple cache plugins. """ + portal = self.getPortal() + from Products.ERP5Type.Cache import CachingMethod + py_script_id = "testCachedMethod" + py_script_obj = getattr(portal, py_script_id) + cf_name = 'erp5_user_factory' + my_cache = CachingMethod(py_script_obj, + 'py_script_obj', + cache_factory=cf_name) + self._cacheFactoryInstanceTest(my_cache, cf_name) + def _cacheFactoryInstanceTest(self, my_cache, cf_name): - portal = self.getPortal() - print - print "="*40 - print "TESTING:", cf_name - - # if the test fails because your machine is too fast, increase this value. - nb_iterations = 30000 - - portal.portal_caches.clearCacheFactory(cf_name) - ## 1st call - start = time.time() - original = my_cache(nb_iterations, portal_path=('', portal.getId())) - end = time.time() - calculation_time = end-start - print "\n\tCalculation time (1st call)", calculation_time - - ## 2nd call - should be cached now - start = time.time() - cached = my_cache(nb_iterations, portal_path=('', portal.getId())) - end = time.time() - calculation_time = end-start - print "\n\tCalculation time (2nd call)", calculation_time - - # check if cache works by getting calculation_time for last cache - # operation even remote cache must have access time less than a second. - # if it's greater than method wasn't previously cached and was calculated - # instead - self.assert_(1.0 > calculation_time) - - ## check if equal. - self.assertEquals(original, cached) - - ## OK so far let's clear cache - portal.portal_caches.clearCacheFactory(cf_name) - - ## 1st call - start = time.time() - original = my_cache(nb_iterations, portal_path=('', portal.getId())) - end = time.time() - calculation_time = end-start - print "\n\tCalculation time (after cache clear)", calculation_time - - ## Cache cleared shouldn't be previously cached - self.assert_(1.0 < calculation_time) + portal = self.getPortal() + print + print "="*40 + print "TESTING:", cf_name + + # if the test fails because your machine is too fast, increase this value. + nb_iterations = 30000 + + portal.portal_caches.clearCacheFactory(cf_name) + ## 1st call + start = time.time() + original = my_cache(nb_iterations, portal_path=('', portal.getId())) + end = time.time() + calculation_time = end-start + print "\n\tCalculation time (1st call)", calculation_time + + ## 2nd call - should be cached now + start = time.time() + cached = my_cache(nb_iterations, portal_path=('', portal.getId())) + end = time.time() + calculation_time = end-start + print "\n\tCalculation time (2nd call)", calculation_time + + # check if cache works by getting calculation_time for last cache + # operation even remote cache must have access time less than a second. + # if it's greater than method wasn't previously cached and was calculated + # instead + self.assert_(1.0 > calculation_time) + + ## check if equal. + self.assertEquals(original, cached) + + ## OK so far let's clear cache + portal.portal_caches.clearCacheFactory(cf_name) + + ## 1st call + start = time.time() + original = my_cache(nb_iterations, portal_path=('', portal.getId())) + end = time.time() + calculation_time = end-start + print "\n\tCalculation time (after cache clear)", calculation_time + + ## Cache cleared shouldn't be previously cached + self.assert_(1.0 < calculation_time) def test_CachePersistentObjects(self): # storing persistent objects in cache is not allowed, but this check is @@ -296,7 +251,6 @@ return result return self.portal.getTitle cached_func = CachingMethod(func, 'cache_bound_method') self.assertRaises(TypeError, cached_func) - def test_suite(): suite = unittest.TestSuite() -- 2.30.9