From ed4e67bc030f7fa24fbcc4e742ce55c1cf8d7cc3 Mon Sep 17 00:00:00 2001 From: Sebastien Robin <seb@nexedi.com> Date: Thu, 20 Jan 2011 18:13:35 +0000 Subject: [PATCH] create mixin class for code common between live tests and command line tests git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@42528 20353a03-c40f-0410-a6d1-a30d3c3de9de --- .../ERP5Type/tests/ERP5TypeLiveTestCase.py | 302 +------ product/ERP5Type/tests/ERP5TypeTestCase.py | 854 +++++++++--------- 2 files changed, 438 insertions(+), 718 deletions(-) diff --git a/product/ERP5Type/tests/ERP5TypeLiveTestCase.py b/product/ERP5Type/tests/ERP5TypeLiveTestCase.py index f97f5481b0..0e8349feea 100644 --- a/product/ERP5Type/tests/ERP5TypeLiveTestCase.py +++ b/product/ERP5Type/tests/ERP5TypeLiveTestCase.py @@ -35,6 +35,7 @@ from Testing.ZopeTestCase import PortalTestCase, user_name from Products.CMFCore.utils import getToolByName from Products.ERP5Type.tests.ProcessingNodeTestCase import ProcessingNodeTestCase from Products.ERP5Type.Globals import get_request +from ERP5TypeTestCase import ERP5TypeTestCaseMixin import transaction from zLOG import LOG, DEBUG, INFO @@ -58,7 +59,7 @@ from Products.ERP5Type.tests import ProcessingNodeTestCase as\ ProcessingNodeTestCaseModule ProcessingNodeTestCaseModule.patchActivityTool = lambda: None -class ERP5TypeLiveTestCase(ProcessingNodeTestCase, PortalTestCase): +class ERP5TypeLiveTestCase(ERP5TypeTestMixin): """ERP5TypeLiveTestCase is the default class for *all* tests in ERP5. It is designed with the idea in mind that tests should be run through the web. Command line based tests may be helpful @@ -73,18 +74,6 @@ class ERP5TypeLiveTestCase(ProcessingNodeTestCase, PortalTestCase): defined. """ - def shortDescription(self): - description = str(self) - doc = self._testMethodDoc - if doc and doc.split("\n")[0].strip(): - description += ', ' + doc.split("\n")[0].strip() - return description - - def getTitle(self): - """Returns the title of the test, for test reports. - """ - return str(self.__class__) - def getPortalName(self): """ Return the default ERP5 site id. """ @@ -102,47 +91,18 @@ class ERP5TypeLiveTestCase(ProcessingNodeTestCase, PortalTestCase): getPortalObject = getPortal - def login(self, user_name='ERP5TypeTestCase', quiet=0): - """ - Most of the time, we need to login before doing anything - """ - PortalTestCase.login(self, user_name) - - def logout(self): - PortalTestCase.logout(self) - # clean up certain cache related REQUEST keys that might be associated - # with the logged in user - for key in ('_ec_cache', '_oai_cache'): - pass - #self.REQUEST.other.pop(key, None) # XXX + #def logout(self): + # PortalTestCase.logout(self) + # # clean up certain cache related REQUEST keys that might be associated + # # with the logged in user + # for key in ('_ec_cache', '_oai_cache'): + # pass + # #self.REQUEST.other.pop(key, None) # XXX def _close(self): '''Closes the ZODB connection.''' transaction.abort() - # class-defined decorators for profiling. - # Depending on the environment variable, they return - # the same method, or a profiling wrapped call - _decorate_setUp = profile_if_environ('PROFILE_SETUP') - _decorate_testRun = profile_if_environ('PROFILE_TESTS') - _decorate_tearDown = profile_if_environ('PROFILE_TEARDOWN') - - def __call__(self, *args, **kw): - # Pulling down the profiling from ZopeTestCase.profiler to allow - # overriding run() - # This cannot be done at instanciation because we need to - # wrap the bottom-most methods, e.g. - # SecurityTestCase.tearDown instead of ERP5TestCase.tearDown - - self.setUp = self._decorate_setUp(self.setUp) - self.tearDown = self._decorate_tearDown(self.tearDown) - - test_name = self._testMethodName - test_method = getattr(self, test_name) - setattr(self, test_name, self._decorate_testRun(test_method)) - - self.run(*args, **kw) - def _setup(self): '''Configures the portal. Framework authors may override. @@ -188,171 +148,6 @@ class ERP5TypeLiveTestCase(ProcessingNodeTestCase, PortalTestCase): ''' pass - def logMessage(self, message): - """ - Shortcut function to log a message - """ - ZopeTestCase._print('\n%s ' % message) - LOG('Testing ... ', DEBUG, message) - - # Utility methods specific to ERP5Type - def getTemplateTool(self): - return getToolByName(self.getPortal(), 'portal_templates', None) - - def getPreferenceTool(self) : - return getToolByName(self.getPortal(), 'portal_preferences', None) - - def getTrashTool(self): - return getToolByName(self.getPortal(), 'portal_trash', None) - - def getPasswordTool(self): - return getToolByName(self.getPortal(), 'portal_password', None) - - def getSkinsTool(self): - return getToolByName(self.getPortal(), 'portal_skins', None) - - def getCategoryTool(self): - return getToolByName(self.getPortal(), 'portal_categories', None) - - def getWorkflowTool(self): - return getToolByName(self.getPortal(), 'portal_workflow', None) - - def getCatalogTool(self): - return getToolByName(self.getPortal(), 'portal_catalog', None) - - def getTypesTool(self): - return getToolByName(self.getPortal(), 'portal_types', None) - getTypeTool = getTypesTool - - def getRuleTool(self): - return getattr(self.getPortal(), 'portal_rules', None) - - def getClassTool(self): - return getattr(self.getPortal(), 'portal_classes', None) - - def getSimulationTool(self): - return getToolByName(self.getPortal(), 'portal_simulation', None) - - def getSQLConnection(self): - return getToolByName(self.getPortal(), 'erp5_sql_connection', None) - - def getPortalId(self): - return self.getPortal().getId() - - def getDomainTool(self): - return getToolByName(self.getPortal(), 'portal_domains', None) - - def getAlarmTool(self): - return getattr(self.getPortal(), 'portal_alarms', None) - - def getActivityTool(self): - return getattr(self.getPortal(), 'portal_activities', None) - - def getArchiveTool(self): - return getattr(self.getPortal(), 'portal_archives', None) - - def getCacheTool(self): - return getattr(self.getPortal(), 'portal_caches', None) - - def getOrganisationModule(self): - return getattr(self.getPortal(), 'organisation_module', - getattr(self.getPortal(), 'organisation', None)) - - def getPersonModule(self): - return getattr(self.getPortal(), 'person_module', - getattr(self.getPortal(), 'person', None)) - - def getCurrencyModule(self): - return getattr(self.getPortal(), 'currency_module', - getattr(self.getPortal(), 'currency', None)) - - def validateRules(self): - """ - try to validate all rules in rule_tool. - """ - rule_tool = self.getRuleTool() - for rule in rule_tool.contentValues( - portal_type=rule_tool.getPortalRuleTypeList()): - if rule.getValidationState() != 'validated': - rule.validate() - - def createSimpleUser(self, title, reference, function): - """ - Helper function to create a Simple ERP5 User. - User password is the reference. - - XXX-JPS do wa have a "delete" method etc. - """ - user = self.createUser(reference, person_kw=dict(title=title)) - assignment = self.createUserAssignement(user, assignment_kw=dict(function=function)) - return user - - def createUser(self, reference, password=None, person_kw=None): - """ - Create an ERP5 User. - Default password is the reference. - person_kw is passed as additional arguments when creating the person - """ - if password is None: - password = reference - if person_kw is None: - person_kw = dict() - - person = self.portal.person_module.newContent(portal_type='Person', - reference=reference, - password=password, - **person_kw) - return person - - def createUserAssignment(self, user, assignment_kw): - """ - Create an assignment to user. - """ - assignment = user.newContent(portal_type='Assignment', **assignment_kw) - assignment.open() - return assignment - - def createUserAssignement(self, user, assignment_kw): - # BBB - warn('createUserAssignement is deprecated;' - 'Use createUserAssignment instead', - DeprecationWarning) - return self.createUserAssignment(user, assignment_kw) - - def failIfDifferentSet(self, a, b, msg=""): - if not msg: - msg='%r != %r' % (a, b) - for i in a: - self.failUnless(i in b, msg) - for i in b: - self.failUnless(i in a, msg) - self.assertEquals(len(a), len(b), msg) - assertSameSet = failIfDifferentSet - - def assertWorkflowTransitionFails(self, object, workflow_id, transition_id, - error_message=None, state_variable='simulation_state'): - """ - Check that passing given transition from given workflow on given object - raises ValidationFailed. - Do sanity checks (workflow history length increased by one, simulation - state unchanged). - If error_message is provided, it is asserted to be equal to the last - workflow history error message. - """ - workflow_tool = self.getWorkflowTool() - reference_history_length = len(workflow_tool.getInfoFor(ob=object, name='history', wf_id=workflow_id)) - state_method = 'get' + convertToUpperCase(state_variable) - method = getattr(object, state_method, None) - reference_workflow_state = method() - self.assertRaises(ValidationFailed, workflow_tool.doActionFor, object, transition_id, wf_id=workflow_id) - workflow_history = workflow_tool.getInfoFor(ob=object, name='history', wf_id=workflow_id) - self.assertEqual(len(workflow_history), reference_history_length + 1) - workflow_error_message = str(workflow_history[-1]['error_message']) - if error_message is not None: - self.assertEqual(workflow_error_message, error_message) - self.assertEqual(method(), reference_workflow_state) - return workflow_error_message - def tearDown(self): '''Tears down the fixture. Do not override, use the hooks instead. @@ -365,85 +160,6 @@ class ERP5TypeLiveTestCase(ProcessingNodeTestCase, PortalTestCase): """ PortalTestCase.beforeClose(self) - def stepPdb(self, sequence=None, sequence_list=None): - """Invoke debugger""" - try: # try ipython if available - import IPython - IPython.Shell.IPShell(argv=[]) - tracer = IPython.Debugger.Tracer() - except ImportError: - from pdb import set_trace as tracer - tracer() - - def stepTic(self, **kw): - """ - The is used to simulate the zope_tic_loop script - Each time this method is called, it simulates a call to tic - which invoke activities in the Activity Tool - """ - if kw.get('sequence', None) is None: - # in case of using not in sequence commit transaction - transaction.commit() - self.tic() - - def publish(self, path, basic=None, env=None, extra=None, - request_method='GET', stdin=None, handle_errors=True): - '''Publishes the object at 'path' returning a response object.''' - - from ZPublisher.Response import Response - from ZPublisher.Test import publish_module - - from AccessControl.SecurityManagement import getSecurityManager - from AccessControl.SecurityManagement import setSecurityManager - - # Save current security manager - sm = getSecurityManager() - - # Commit the sandbox for good measure - transaction.commit() - - if env is None: - env = {} - if extra is None: - extra = {} - - request = self.app.REQUEST - - env['SERVER_NAME'] = request['SERVER_NAME'] - env['SERVER_PORT'] = request['SERVER_PORT'] - env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET'] - env['REQUEST_METHOD'] = request_method - - p = path.split('?') - if len(p) == 1: - env['PATH_INFO'] = p[0] - elif len(p) == 2: - [env['PATH_INFO'], env['QUERY_STRING']] = p - else: - raise TypeError, '' - - if basic: - env['HTTP_AUTHORIZATION'] = "Basic %s" % base64.encodestring(basic).replace('\012', '') - - if stdin is None: - stdin = StringIO() - - outstream = StringIO() - response = Response(stdout=outstream, stderr=sys.stderr) - - publish_module('Zope2', - response=response, - stdin=stdin, - environ=env, - extra=extra, - debug=not handle_errors, - ) - - # Restore security manager - setSecurityManager(sm) - - return ResponseWrapper(response, outstream, path) - def runLiveTest(test_list, verbosity=1, stream=None, **kw): from Products.ERP5Type.tests.runUnitTest import DebugTestResult from Products.ERP5Type.tests.runUnitTest import ERP5TypeTestLoader diff --git a/product/ERP5Type/tests/ERP5TypeTestCase.py b/product/ERP5Type/tests/ERP5TypeTestCase.py index c32aebb6cf..4bf8ff8508 100644 --- a/product/ERP5Type/tests/ERP5TypeTestCase.py +++ b/product/ERP5Type/tests/ERP5TypeTestCase.py @@ -274,10 +274,8 @@ def profile_if_environ(environment_var_name): # No profiling, return identity decorator return lambda self, method: method -class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): - """TestCase for ERP5 based tests. - - This TestCase setups an ERP5Site and installs business templates. +class ERP5TypeTestMixin(ProcessingNodeTestCase, PortalTestCase): + """Mixin class for ERP5 based tests. """ def shortDescription(self): @@ -287,10 +285,6 @@ class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): description += ', ' + doc.split("\n")[0].strip() return description - def dummy_test(self): - ZopeTestCase._print('All tests are skipped when --save option is passed ' - 'with --update_business_templates or without --load') - def getRevision(self): erp5_path = os.path.join(instancehome, 'Products', 'ERP5') try: @@ -304,6 +298,340 @@ class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): """ return str(self.__class__) + def login(self, user_name='ERP5TypeTestCase', quiet=0): + """ + Most of the time, we need to login before doing anything + """ + PortalTestCase.login(self, user_name) + + def logout(self): + PortalTestCase.logout(self) + # clean up certain cache related REQUEST keys that might be associated + # with the logged in user + for key in ('_ec_cache', '_oai_cache'): + self.REQUEST.other.pop(key, None) + + def _setupUser(self): + '''Creates the default user.''' + uf = self.portal.acl_users + # do nothing if the user already exists + if not uf.getUser(user_name): + uf._doAddUser(user_name, 'secret', ['Member'], []) + + def getDefaultSitePreferenceId(self): + """Default id, usefull method to override + """ + return "default_site_preference" + + # Utility methods specific to ERP5Type + def getTemplateTool(self): + return getToolByName(self.getPortal(), 'portal_templates', None) + + def getPreferenceTool(self) : + return getToolByName(self.getPortal(), 'portal_preferences', None) + + def getTrashTool(self): + return getToolByName(self.getPortal(), 'portal_trash', None) + + def getPasswordTool(self): + return getToolByName(self.getPortal(), 'portal_password', None) + + def getSkinsTool(self): + return getToolByName(self.getPortal(), 'portal_skins', None) + + def getCategoryTool(self): + return getToolByName(self.getPortal(), 'portal_categories', None) + + def getWorkflowTool(self): + return getToolByName(self.getPortal(), 'portal_workflow', None) + + def getCatalogTool(self): + return getToolByName(self.getPortal(), 'portal_catalog', None) + + def getTypesTool(self): + return getToolByName(self.getPortal(), 'portal_types', None) + getTypeTool = getTypesTool + + def getRuleTool(self): + return getattr(self.getPortal(), 'portal_rules', None) + + def getClassTool(self): + return getattr(self.getPortal(), 'portal_classes', None) + + def getSimulationTool(self): + return getToolByName(self.getPortal(), 'portal_simulation', None) + + def getSQLConnection(self): + return getToolByName(self.getPortal(), 'erp5_sql_connection', None) + + def getPortalId(self): + return self.getPortal().getId() + + def getDomainTool(self): + return getToolByName(self.getPortal(), 'portal_domains', None) + + def getAlarmTool(self): + return getattr(self.getPortal(), 'portal_alarms', None) + + def getActivityTool(self): + return getattr(self.getPortal(), 'portal_activities', None) + + def getArchiveTool(self): + return getattr(self.getPortal(), 'portal_archives', None) + + def getCacheTool(self): + return getattr(self.getPortal(), 'portal_caches', None) + + def getOrganisationModule(self): + return getattr(self.getPortal(), 'organisation_module', + getattr(self.getPortal(), 'organisation', None)) + + def getPersonModule(self): + return getattr(self.getPortal(), 'person_module', + getattr(self.getPortal(), 'person', None)) + + def getCurrencyModule(self): + return getattr(self.getPortal(), 'currency_module', + getattr(self.getPortal(), 'currency', None)) + + def _addPropertySheet(self, portal_type_name, + property_sheet_name='TestPropertySheet', + property_sheet_code=None): + """Utility method to add a property sheet to a type information. + You might be interested in the higer level method _addProperty + This method registers all added property sheets, to be able to remove + them in tearDown. + """ + # install the 'real' class tool + class_tool = self.getClassTool() + + if property_sheet_code is not None: + class_tool.newPropertySheet(property_sheet_name) + # XXX need to commit the transaction at this point, because class tool + # files are no longer available to the current transaction. + transaction.commit() + class_tool.editPropertySheet(property_sheet_name, property_sheet_code) + transaction.commit() + class_tool.importPropertySheet(property_sheet_name) + + # We set the property sheet on the portal type + types_tool = self.getTypesTool() + ti = types_tool.getTypeInfo(portal_type_name) + property_sheet_set = set(ti.getTypePropertySheetList()) + property_sheet_set.add(property_sheet_name) + ti._setTypePropertySheetList(list(property_sheet_set)) + + # remember that we added a property sheet for tear down + self._added_property_sheets.setdefault( + portal_type_name, []).append(property_sheet_name) + # reset aq_dynamic cache + types_tool.resetDynamicDocuments() + + def getRule(self, **kw): + return self.portal.portal_rules.searchFolder( + sort_on='version', sort_order='descending', **kw)[0].getObject() + + def validateRules(self): + """ + try to validate all rules in rule_tool. + """ + rule_tool = self.getRuleTool() + for rule in rule_tool.contentValues( + portal_type=rule_tool.getPortalRuleTypeList()): + if rule.getValidationState() != 'validated': + rule.validate() + + def createSimpleUser(self, title, reference, function): + """ + Helper function to create a Simple ERP5 User. + User password is the reference. + """ + user = self.createUser(reference, person_kw=dict(title=title)) + assignment = self.createUserAssignement(user, assignment_kw=dict(function=function)) + return user + + def createUser(self, reference, password=None, person_kw=None): + """ + Create an ERP5 User. + Default password is the reference. + person_kw is passed as additional arguments when creating the person + """ + if password is None: + password = reference + if person_kw is None: + person_kw = dict() + + person = self.portal.person_module.newContent(portal_type='Person', + reference=reference, + password=password, + **person_kw) + return person + + def createUserAssignment(self, user, assignment_kw): + """ + Create an assignment to user. + """ + assignment = user.newContent(portal_type='Assignment', **assignment_kw) + assignment.open() + return assignment + + def createUserAssignement(self, user, assignment_kw): + # BBB + warn('createUserAssignement is deprecated;' + 'Use createUserAssignment instead', + DeprecationWarning) + return self.createUserAssignment(user, assignment_kw) + + def failIfDifferentSet(self, a, b, msg=""): + if not msg: + msg='%r != %r' % (a, b) + self.assertEquals(set(a), set(b), msg) + assertSameSet = failIfDifferentSet + + def assertWorkflowTransitionFails(self, object, workflow_id, transition_id, + error_message=None, state_variable='simulation_state'): + """ + Check that passing given transition from given workflow on given object + raises ValidationFailed. + Do sanity checks (workflow history length increased by one, simulation + state unchanged). + If error_message is provided, it is asserted to be equal to the last + workflow history error message. + """ + workflow_tool = self.getWorkflowTool() + reference_history_length = len(workflow_tool.getInfoFor(ob=object, name='history', wf_id=workflow_id)) + state_method = 'get' + convertToUpperCase(state_variable) + method = getattr(object, state_method, None) + reference_workflow_state = method() + self.assertRaises(ValidationFailed, workflow_tool.doActionFor, object, transition_id, wf_id=workflow_id) + workflow_history = workflow_tool.getInfoFor(ob=object, name='history', wf_id=workflow_id) + self.assertEqual(len(workflow_history), reference_history_length + 1) + workflow_error_message = str(workflow_history[-1]['error_message']) + if error_message is not None: + self.assertEqual(workflow_error_message, error_message) + self.assertEqual(method(), reference_workflow_state) + return workflow_error_message + + def stepPdb(self, sequence=None, sequence_list=None): + """Invoke debugger""" + try: # try ipython if available + import IPython + IPython.Shell.IPShell(argv=[]) + tracer = IPython.Debugger.Tracer() + except ImportError: + from pdb import set_trace as tracer + tracer() + + def stepTic(self, **kw): + """ + The is used to simulate the zope_tic_loop script + Each time this method is called, it simulates a call to tic + which invoke activities in the Activity Tool + """ + if kw.get('sequence', None) is None: + # in case of using not in sequence commit transaction + transaction.commit() + self.tic() + + getPortalObject = getPortal + + # class-defined decorators for profiling. + # Depending on the environment variable, they return + # the same method, or a profiling wrapped call + _decorate_setUp = profile_if_environ('PROFILE_SETUP') + _decorate_testRun = profile_if_environ('PROFILE_TESTS') + _decorate_tearDown = profile_if_environ('PROFILE_TEARDOWN') + + def __call__(self, *args, **kw): + # Pulling down the profiling from ZopeTestCase.profiler to allow + # overriding run() + # This cannot be done at instanciation because we need to + # wrap the bottom-most methods, e.g. + # SecurityTestCase.tearDown instead of ERP5TestCase.tearDown + + self.setUp = self._decorate_setUp(self.setUp) + self.tearDown = self._decorate_tearDown(self.tearDown) + + test_name = self._testMethodName + test_method = getattr(self, test_name) + setattr(self, test_name, self._decorate_testRun(test_method)) + + self.run(*args, **kw) + + def logMessage(self, message): + """ + Shortcut function to log a message + """ + ZopeTestCase._print('\n%s ' % message) + LOG('Testing ... ', DEBUG, message) + + def publish(self, path, basic=None, env=None, extra=None, + request_method='GET', stdin=None, handle_errors=True): + '''Publishes the object at 'path' returning a response object.''' + + from ZPublisher.Response import Response + from ZPublisher.Test import publish_module + + from AccessControl.SecurityManagement import getSecurityManager + from AccessControl.SecurityManagement import setSecurityManager + + # Save current security manager + sm = getSecurityManager() + + # Commit the sandbox for good measure + transaction.commit() + + if env is None: + env = {} + if extra is None: + extra = {} + + request = self.app.REQUEST + + env['SERVER_NAME'] = request['SERVER_NAME'] + env['SERVER_PORT'] = request['SERVER_PORT'] + env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET'] + env['REQUEST_METHOD'] = request_method + + p = path.split('?') + if len(p) == 1: + env['PATH_INFO'] = p[0] + elif len(p) == 2: + [env['PATH_INFO'], env['QUERY_STRING']] = p + else: + raise TypeError, '' + + if basic: + env['HTTP_AUTHORIZATION'] = "Basic %s" % base64.encodestring(basic).replace('\012', '') + + if stdin is None: + stdin = StringIO() + + outstream = StringIO() + response = Response(stdout=outstream, stderr=sys.stderr) + + publish_module('Zope2', + response=response, + stdin=stdin, + environ=env, + extra=extra, + debug=not handle_errors, + ) + + # Restore security manager + setSecurityManager(sm) + + return ResponseWrapper(response, outstream, path) +class ERP5TypeTestCase(ERP5TypeTestMixin): + """TestCase for ERP5 based tests. + + This TestCase setups an ERP5Site and installs business templates. + """ + + def dummy_test(self): + ZopeTestCase._print('All tests are skipped when --save option is passed ' + 'with --update_business_templates or without --load') + def getPortalName(self): """ Return the name of a portal for this test case. @@ -337,8 +665,6 @@ class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): setSite(portal) return portal - getPortalObject = getPortal - def _app(self): '''Opens a ZODB connection and returns the app object. @@ -366,49 +692,6 @@ class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): """ return 0 - def login(self, user_name='ERP5TypeTestCase', quiet=0): - """ - Most of the time, we need to login before doing anything - """ - PortalTestCase.login(self, user_name) - - def logout(self): - PortalTestCase.logout(self) - # clean up certain cache related REQUEST keys that might be associated - # with the logged in user - for key in ('_ec_cache', '_oai_cache'): - self.REQUEST.other.pop(key, None) - - def _setupUser(self): - '''Creates the default user.''' - uf = self.portal.acl_users - # do nothing if the user already exists - if not uf.getUser(user_name): - uf._doAddUser(user_name, 'secret', ['Member'], []) - - # class-defined decorators for profiling. - # Depending on the environment variable, they return - # the same method, or a profiling wrapped call - _decorate_setUp = profile_if_environ('PROFILE_SETUP') - _decorate_testRun = profile_if_environ('PROFILE_TESTS') - _decorate_tearDown = profile_if_environ('PROFILE_TEARDOWN') - - def __call__(self, *args, **kw): - # Pulling down the profiling from ZopeTestCase.profiler to allow - # overriding run() - # This cannot be done at instanciation because we need to - # wrap the bottom-most methods, e.g. - # SecurityTestCase.tearDown instead of ERP5TestCase.tearDown - - self.setUp = self._decorate_setUp(self.setUp) - self.tearDown = self._decorate_tearDown(self.tearDown) - - test_name = self._testMethodName - test_method = getattr(self, test_name) - setattr(self, test_name, self._decorate_testRun(test_method)) - - self.run(*args, **kw) - @staticmethod def _getBTPathAndIdList(template_list): bootstrap_path = os.environ.get('erp5_tests_bootstrap_path') or \ @@ -479,313 +762,112 @@ class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): if uninstalled_list: getattr(portal, 'ERP5Site_updateTranslationTable', lambda: None)() self.stepTic() - return uninstalled_list - - def setUp(self): - '''Sets up the fixture. Do not override, - use the hooks instead. - ''' - from Products.CMFActivity.ActivityRuntimeEnvironment import BaseMessage - # Activities in unit tests shall never fail. - # Let's be a litte tolerant for the moment. - BaseMessage.max_retry = property(lambda self: - self.activity_kw.get('max_retry', 1)) - - use_dummy_mail_host = os.environ.get('use_dummy_mail_host', 0) - template_list = self.getBusinessTemplateList() - erp5_catalog_storage = os.environ.get('erp5_catalog_storage', - 'erp5_mysql_innodb_catalog') - update_business_templates = os.environ.get('update_business_templates') is not None - erp5_load_data_fs = int(os.environ.get('erp5_load_data_fs', 0)) - if update_business_templates and erp5_load_data_fs: - update_only = os.environ.get('update_only', None) - template_list = (erp5_catalog_storage, 'erp5_core', - 'erp5_xhtml_style') + tuple(template_list) - # Update only specified business templates, regular expression - # can be used. - if update_only is not None: - update_only_list = update_only.split(',') - matching_template_list = [] - # First parse the template list in order to keep same order - for business_template in template_list: - for expression in update_only_list: - if re.search(expression, business_template): - matching_template_list.append(business_template) - template_list = matching_template_list - - # keep a mapping type info name -> property sheet list, to remove them in - # tear down. - self._added_property_sheets = {} - light_install = self.enableLightInstall() - create_activities = self.enableActivityTool() - hot_reindexing = self.enableHotReindexing() - self.setUpERP5Site(business_template_list=template_list, - light_install=light_install, - create_activities=create_activities, - quiet=install_bt5_quiet, - hot_reindexing=hot_reindexing, - erp5_catalog_storage=erp5_catalog_storage, - use_dummy_mail_host=use_dummy_mail_host) - PortalTestCase.setUp(self) - - def afterSetUp(self): - '''Called after setUp() has completed. This is - far and away the most useful hook. - ''' - pass - - def getBusinessTemplateList(self): - """ - You must override this. Return the list of business templates. - """ - return () - - def logMessage(self, message): - """ - Shortcut function to log a message - """ - ZopeTestCase._print('\n%s ' % message) - LOG('Testing ... ', DEBUG, message) - - def _updateConnectionStrings(self): - """Update connection strings with values passed by the testRunner - """ - # update connection strings - for connection_string_name, connection_string in\ - _getConnectionStringDict().items(): - connection_name = connection_string_name.replace('_string', '') - getattr(self.portal, connection_name).edit('', connection_string) - - def _setUpDummyMailHost(self): - """Replace Original Mail Host by Dummy Mail Host. - """ - if 'MailHost' in self.portal.objectIds(): - self.portal.manage_delObjects(['MailHost']) - self.portal._setObject('MailHost', DummyMailHost('MailHost')) - - def _updateConversionServerConfiguration(self): - """Update conversion server (Oood) at default site preferences. - """ - conversion_dict = _getConversionServerDict() - preference = self.portal.portal_preferences[ - self.getDefaultSitePreferenceId()] - preference._setPreferredOoodocServerAddress(conversion_dict['hostname']) - preference._setPreferredOoodocServerPortNumber(conversion_dict['port']) - - def getDefaultSitePreferenceId(self): - """Default id, usefull method to override - """ - return "default_site_preference" - - def _recreateCatalog(self, quiet=0): - """Clear activities and catalog and recatalog everything. - Test runner can set `erp5_tests_recreate_catalog` environnement variable, - in that case we have to clear catalog. """ - if int(os.environ.get('erp5_tests_recreate_catalog', 0)): - try: - _start = time.time() - if not quiet: - ZopeTestCase._print('\nRecreating catalog ... ') - portal = self.getPortal() - portal.portal_activities.manageClearActivities() - portal.portal_catalog.manage_catalogClear() - transaction.commit() - portal.ERP5Site_reindexAll() - transaction.commit() - self.tic() - if not quiet: - ZopeTestCase._print('done (%.3fs)\n' % (time.time() - _start,)) - finally: - os.environ['erp5_tests_recreate_catalog'] = '0' - - # Utility methods specific to ERP5Type - def getTemplateTool(self): - return getToolByName(self.getPortal(), 'portal_templates', None) - - def getPreferenceTool(self) : - return getToolByName(self.getPortal(), 'portal_preferences', None) - - def getTrashTool(self): - return getToolByName(self.getPortal(), 'portal_trash', None) - - def getPasswordTool(self): - return getToolByName(self.getPortal(), 'portal_password', None) - - def getSkinsTool(self): - return getToolByName(self.getPortal(), 'portal_skins', None) - - def getCategoryTool(self): - return getToolByName(self.getPortal(), 'portal_categories', None) - - def getWorkflowTool(self): - return getToolByName(self.getPortal(), 'portal_workflow', None) - - def getCatalogTool(self): - return getToolByName(self.getPortal(), 'portal_catalog', None) - - def getTypesTool(self): - return getToolByName(self.getPortal(), 'portal_types', None) - getTypeTool = getTypesTool - - def getRuleTool(self): - return getattr(self.getPortal(), 'portal_rules', None) - - def getClassTool(self): - return getattr(self.getPortal(), 'portal_classes', None) - - def getSimulationTool(self): - return getToolByName(self.getPortal(), 'portal_simulation', None) - - def getSQLConnection(self): - return getToolByName(self.getPortal(), 'erp5_sql_connection', None) - - def getPortalId(self): - return self.getPortal().getId() - - def getDomainTool(self): - return getToolByName(self.getPortal(), 'portal_domains', None) - - def getAlarmTool(self): - return getattr(self.getPortal(), 'portal_alarms', None) - - def getActivityTool(self): - return getattr(self.getPortal(), 'portal_activities', None) - - def getArchiveTool(self): - return getattr(self.getPortal(), 'portal_archives', None) - - def getCacheTool(self): - return getattr(self.getPortal(), 'portal_caches', None) - - def getOrganisationModule(self): - return getattr(self.getPortal(), 'organisation_module', - getattr(self.getPortal(), 'organisation', None)) - - def getPersonModule(self): - return getattr(self.getPortal(), 'person_module', - getattr(self.getPortal(), 'person', None)) - - def getCurrencyModule(self): - return getattr(self.getPortal(), 'currency_module', - getattr(self.getPortal(), 'currency', None)) - - def _addPropertySheet(self, portal_type_name, - property_sheet_name='TestPropertySheet', - property_sheet_code=None): - """Utility method to add a property sheet to a type information. - You might be interested in the higer level method _addProperty - This method registers all added property sheets, to be able to remove - them in tearDown. - """ - # install the 'real' class tool - class_tool = self.getClassTool() - - if property_sheet_code is not None: - class_tool.newPropertySheet(property_sheet_name) - # XXX need to commit the transaction at this point, because class tool - # files are no longer available to the current transaction. - transaction.commit() - class_tool.editPropertySheet(property_sheet_name, property_sheet_code) - transaction.commit() - class_tool.importPropertySheet(property_sheet_name) + return uninstalled_list - # We set the property sheet on the portal type - types_tool = self.getTypesTool() - ti = types_tool.getTypeInfo(portal_type_name) - property_sheet_set = set(ti.getTypePropertySheetList()) - property_sheet_set.add(property_sheet_name) - ti._setTypePropertySheetList(list(property_sheet_set)) + def setUp(self): + '''Sets up the fixture. Do not override, + use the hooks instead. + ''' + from Products.CMFActivity.ActivityRuntimeEnvironment import BaseMessage + # Activities in unit tests shall never fail. + # Let's be a litte tolerant for the moment. + BaseMessage.max_retry = property(lambda self: + self.activity_kw.get('max_retry', 1)) - # remember that we added a property sheet for tear down - self._added_property_sheets.setdefault( - portal_type_name, []).append(property_sheet_name) - # reset aq_dynamic cache - types_tool.resetDynamicDocuments() + use_dummy_mail_host = os.environ.get('use_dummy_mail_host', 0) + template_list = self.getBusinessTemplateList() + erp5_catalog_storage = os.environ.get('erp5_catalog_storage', + 'erp5_mysql_innodb_catalog') + update_business_templates = os.environ.get('update_business_templates') is not None + erp5_load_data_fs = int(os.environ.get('erp5_load_data_fs', 0)) + if update_business_templates and erp5_load_data_fs: + update_only = os.environ.get('update_only', None) + template_list = (erp5_catalog_storage, 'erp5_core', + 'erp5_xhtml_style') + tuple(template_list) + # Update only specified business templates, regular expression + # can be used. + if update_only is not None: + update_only_list = update_only.split(',') + matching_template_list = [] + # First parse the template list in order to keep same order + for business_template in template_list: + for expression in update_only_list: + if re.search(expression, business_template): + matching_template_list.append(business_template) + template_list = matching_template_list - def getRule(self, **kw): - return self.portal.portal_rules.searchFolder( - sort_on='version', sort_order='descending', **kw)[0].getObject() + # keep a mapping type info name -> property sheet list, to remove them in + # tear down. + self._added_property_sheets = {} + light_install = self.enableLightInstall() + create_activities = self.enableActivityTool() + hot_reindexing = self.enableHotReindexing() + self.setUpERP5Site(business_template_list=template_list, + light_install=light_install, + create_activities=create_activities, + quiet=install_bt5_quiet, + hot_reindexing=hot_reindexing, + erp5_catalog_storage=erp5_catalog_storage, + use_dummy_mail_host=use_dummy_mail_host) + PortalTestCase.setUp(self) - def validateRules(self): - """ - try to validate all rules in rule_tool. - """ - rule_tool = self.getRuleTool() - for rule in rule_tool.contentValues( - portal_type=rule_tool.getPortalRuleTypeList()): - if rule.getValidationState() != 'validated': - rule.validate() + def afterSetUp(self): + '''Called after setUp() has completed. This is + far and away the most useful hook. + ''' + pass - def createSimpleUser(self, title, reference, function): + def getBusinessTemplateList(self): """ - Helper function to create a Simple ERP5 User. - User password is the reference. + You must override this. Return the list of business templates. """ - user = self.createUser(reference, person_kw=dict(title=title)) - assignment = self.createUserAssignement(user, assignment_kw=dict(function=function)) - return user + return () - def createUser(self, reference, password=None, person_kw=None): - """ - Create an ERP5 User. - Default password is the reference. - person_kw is passed as additional arguments when creating the person + def _updateConnectionStrings(self): + """Update connection strings with values passed by the testRunner """ - if password is None: - password = reference - if person_kw is None: - person_kw = dict() - - person = self.portal.person_module.newContent(portal_type='Person', - reference=reference, - password=password, - **person_kw) - return person + # update connection strings + for connection_string_name, connection_string in\ + _getConnectionStringDict().items(): + connection_name = connection_string_name.replace('_string', '') + getattr(self.portal, connection_name).edit('', connection_string) - def createUserAssignment(self, user, assignment_kw): - """ - Create an assignment to user. + def _setUpDummyMailHost(self): + """Replace Original Mail Host by Dummy Mail Host. """ - assignment = user.newContent(portal_type='Assignment', **assignment_kw) - assignment.open() - return assignment - - def createUserAssignement(self, user, assignment_kw): - # BBB - warn('createUserAssignement is deprecated;' - 'Use createUserAssignment instead', - DeprecationWarning) - return self.createUserAssignment(user, assignment_kw) - - def failIfDifferentSet(self, a, b, msg=""): - if not msg: - msg='%r != %r' % (a, b) - self.assertEquals(set(a), set(b), msg) - assertSameSet = failIfDifferentSet + if 'MailHost' in self.portal.objectIds(): + self.portal.manage_delObjects(['MailHost']) + self.portal._setObject('MailHost', DummyMailHost('MailHost')) - def assertWorkflowTransitionFails(self, object, workflow_id, transition_id, - error_message=None, state_variable='simulation_state'): - """ - Check that passing given transition from given workflow on given object - raises ValidationFailed. - Do sanity checks (workflow history length increased by one, simulation - state unchanged). - If error_message is provided, it is asserted to be equal to the last - workflow history error message. + def _updateConversionServerConfiguration(self): + """Update conversion server (Oood) at default site preferences. """ - workflow_tool = self.getWorkflowTool() - reference_history_length = len(workflow_tool.getInfoFor(ob=object, name='history', wf_id=workflow_id)) - state_method = 'get' + convertToUpperCase(state_variable) - method = getattr(object, state_method, None) - reference_workflow_state = method() - self.assertRaises(ValidationFailed, workflow_tool.doActionFor, object, transition_id, wf_id=workflow_id) - workflow_history = workflow_tool.getInfoFor(ob=object, name='history', wf_id=workflow_id) - self.assertEqual(len(workflow_history), reference_history_length + 1) - workflow_error_message = str(workflow_history[-1]['error_message']) - if error_message is not None: - self.assertEqual(workflow_error_message, error_message) - self.assertEqual(method(), reference_workflow_state) - return workflow_error_message + conversion_dict = _getConversionServerDict() + preference = self.portal.portal_preferences[ + self.getDefaultSitePreferenceId()] + preference._setPreferredOoodocServerAddress(conversion_dict['hostname']) + preference._setPreferredOoodocServerPortNumber(conversion_dict['port']) + + def _recreateCatalog(self, quiet=0): + """Clear activities and catalog and recatalog everything. + Test runner can set `erp5_tests_recreate_catalog` environnement variable, + in that case we have to clear catalog. """ + if int(os.environ.get('erp5_tests_recreate_catalog', 0)): + try: + _start = time.time() + if not quiet: + ZopeTestCase._print('\nRecreating catalog ... ') + portal = self.getPortal() + portal.portal_activities.manageClearActivities() + portal.portal_catalog.manage_catalogClear() + transaction.commit() + portal.ERP5Site_reindexAll() + transaction.commit() + self.tic() + if not quiet: + ZopeTestCase._print('done (%.3fs)\n' % (time.time() - _start,)) + finally: + os.environ['erp5_tests_recreate_catalog'] = '0' def _installBusinessTemplateList(self, business_template_list, light_install=True, @@ -1003,27 +1085,6 @@ class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): break PortalTestCase.tearDown(self) - def stepPdb(self, sequence=None, sequence_list=None): - """Invoke debugger""" - try: # try ipython if available - import IPython - IPython.Shell.IPShell(argv=[]) - tracer = IPython.Debugger.Tracer() - except ImportError: - from pdb import set_trace as tracer - tracer() - - def stepTic(self, **kw): - """ - The is used to simulate the zope_tic_loop script - Each time this method is called, it simulates a call to tic - which invoke activities in the Activity Tool - """ - if kw.get('sequence', None) is None: - # in case of using not in sequence commit transaction - transaction.commit() - self.tic() - def importObjectFromFile(self, container, relative_path, **kw): """Import an object from a file located in $TESTFILEDIR/input/""" test_path = os.path.dirname(__file__) @@ -1033,63 +1094,6 @@ class ERP5TypeTestCase(ProcessingNodeTestCase, PortalTestCase): obj.manage_afterClone(obj) return obj - def publish(self, path, basic=None, env=None, extra=None, - request_method='GET', stdin=None, handle_errors=True): - '''Publishes the object at 'path' returning a response object.''' - - from ZPublisher.Response import Response - from ZPublisher.Test import publish_module - - from AccessControl.SecurityManagement import getSecurityManager - from AccessControl.SecurityManagement import setSecurityManager - - # Save current security manager - sm = getSecurityManager() - - # Commit the sandbox for good measure - transaction.commit() - - if env is None: - env = {} - if extra is None: - extra = {} - - request = self.app.REQUEST - - env['SERVER_NAME'] = request['SERVER_NAME'] - env['SERVER_PORT'] = request['SERVER_PORT'] - env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET'] - env['REQUEST_METHOD'] = request_method - - p = path.split('?') - if len(p) == 1: - env['PATH_INFO'] = p[0] - elif len(p) == 2: - [env['PATH_INFO'], env['QUERY_STRING']] = p - else: - raise TypeError, '' - - if basic: - env['HTTP_AUTHORIZATION'] = "Basic %s" % base64.encodestring(basic).replace('\012', '') - - if stdin is None: - stdin = StringIO() - - outstream = StringIO() - response = Response(stdout=outstream, stderr=sys.stderr) - - publish_module('Zope2', - response=response, - stdin=stdin, - environ=env, - extra=extra, - debug=not handle_errors, - ) - - # Restore security manager - setSecurityManager(sm) - - return ResponseWrapper(response, outstream, path) from Products.ERP5 import ERP5Site ERP5Site.getBootstrapBusinessTemplateUrl = lambda bt_title: \ -- 2.30.9