Commit c6cb5258 authored by Jérome Perrin's avatar Jérome Perrin

Prepare test framework for python3 compatibility

Some general changes to test framework and shared test code

See merge request !1936
parents 81268f58 13526eb5
......@@ -6323,7 +6323,7 @@ class TestInternalInvoiceTransaction(AccountingTestCase):
'Internal Invoice Transaction',
payment.getPortalType())
self.assertEqual(internal_invoice, payment.getCausalityValue())
self.assertItemsEqual(
self.assertCountEqual(
[ (self.portal.account_module.bank, 100, 0),
(self.portal.account_module.receivable, 0, 100), ],
[ (line.getSourceValue(), line.getSourceDebit(), line.getSourceCredit())
......
......@@ -666,7 +666,7 @@ class TestBankReconciliation(AccountingTestCase, ERP5ReportTestCase):
[x.getObject() for x in
bank_reconciliation_for_main_section.BankReconciliation_getAccountingTransactionLineList()])
self.assertItemsEqual(
self.assertCountEqual(
[bank_reconciliation_for_section],
internal_transaction.bank.getAggregateValueList())
......@@ -692,7 +692,7 @@ class TestBankReconciliation(AccountingTestCase, ERP5ReportTestCase):
[x.getObject() for x in
bank_reconciliation_for_section.BankReconciliation_getAccountingTransactionLineList()])
self.assertItemsEqual(
self.assertCountEqual(
[bank_reconciliation_for_section, bank_reconciliation_for_main_section],
internal_transaction.bank.getAggregateValueList())
......@@ -719,7 +719,7 @@ class TestBankReconciliation(AccountingTestCase, ERP5ReportTestCase):
[x.getObject() for x in
bank_reconciliation_for_main_section.BankReconciliation_getAccountingTransactionLineList()])
self.assertItemsEqual(
self.assertCountEqual(
[bank_reconciliation_for_main_section],
internal_transaction.bank.getAggregateValueList())
......
......@@ -1163,7 +1163,7 @@ class StandardConfigurationMixin(TestLiveConfiguratorWorkflowMixin):
solver_decision, = sale_packing_list.Delivery_getSolverDecisionList()
self.assertItemsEqual(
self.assertCountEqual(
solver_decision.getCausalityValue().getSolverValueList(),
[
self.portal.portal_solvers['Simple Quantity Split Solver'],
......@@ -1601,7 +1601,7 @@ class StandardConfigurationMixin(TestLiveConfiguratorWorkflowMixin):
solver_decision, = purchase_packing_list.Delivery_getSolverDecisionList()
self.assertItemsEqual(
self.assertCountEqual(
solver_decision.getCausalityValue().getSolverValueList(),
[
self.portal.portal_solvers['Simple Quantity Split Solver'],
......
......@@ -234,7 +234,7 @@ class TestAlarm(AlarmTestCase):
tag_set.add(m.activity_kw.get('after_tag'))
elif m.method_id in (sense_method_id, 'immediateReindexObject'):
tag_set.add(m.activity_kw.get('tag'))
self.assertItemsEqual(method_id_list, expected_method_list)
self.assertCountEqual(method_id_list, expected_method_list)
self.assertEqual(len(tag_set), 1, tag_set)
# check tags after activeSense
assertSingleTagAndMethodItemsEqual(['notify', sense_method_id])
......
......@@ -36,6 +36,7 @@ from Acquisition import aq_base
from App.config import getConfiguration
from Products.ERP5Type.tests.Sequence import SequenceList, Sequence
from urllib import pathname2url
from Testing import ZopeTestCase
from Products.ERP5Type.Globals import PersistentMapping
from Products.ERP5Type.dynamic.lazy_class import ERP5BaseBroken
from Products.ERP5Type.tests.utils import LogInterceptor
......@@ -201,6 +202,30 @@ class BusinessTemplateMixin(ERP5TypeTestCase, LogInterceptor):
property_sheet_tool.manage_delObjects([property_sheet])
self.commit()
self._ignore_log_errors()
self.cancelFailedActivities()
def cancelFailedActivities(self):
"""Cancel failed activities and mark this test as failed if there was any.
We do this because we don't want a test which did not fail but left failing
activities to succeed. Test which failed and also left failing activities
will count as 2 failures, so with this method it may happen that the number
of failures is higher than the number of tests.
"""
self.abort()
try:
self.tic()
except RuntimeError: # "tic is running forever"
activity_tool = self.portal.portal_activities
for message in activity_tool.getMessageList():
activity_tool.manageDelete(message.uid, message.activity)
ZopeTestCase._print('\nCancelling active message %s/%s\n'
% ('/'.join(message.object_path), message.method_id) )
self.commit()
# if activity failed, it's most probably because of broken components,
# so try to reset to not leave broken components for next tests
self.portal.portal_components.reset(force=True)
self.fail("A previous Tic failed")
def getBusinessTemplate(self,title):
"""
......@@ -1699,7 +1724,7 @@ class BusinessTemplateMixin(ERP5TypeTestCase, LogInterceptor):
# check filter
filter_dict = catalog._getFilterDict()
filter_ = filter_dict[method_id]
self.assertItemsEqual(filter_['expression_cache_key'], ['portal_type'])
self.assertCountEqual(filter_['expression_cache_key'], ['portal_type'])
self.assertEqual(filter_['type'], [])
self.assertEqual(filter_['filtered'], 1)
self.assertEqual(filter_['expression'], 'python: context.isPredicate()')
......@@ -1716,7 +1741,7 @@ class BusinessTemplateMixin(ERP5TypeTestCase, LogInterceptor):
# check filter
filter_dict = catalog._getFilterDict()
filter_ = filter_dict[method_id]
self.assertItemsEqual(filter_['expression_cache_key'], ['portal_type'])
self.assertCountEqual(filter_['expression_cache_key'], ['portal_type'])
self.assertEqual(filter_['type'], [])
self.assertEqual(filter_['filtered'], 1)
self.assertEqual(filter_['expression'], 'python: context.isDelivery()')
......
......@@ -1624,7 +1624,7 @@ class TestERP5Base(ERP5TypeTestCase):
newContent(portal_type='Person', title='Owned by user_3')
login()
self.tic()
self.assertItemsEqual(
self.assertCountEqual(
must_find_path_list,
[
x.path
......
......@@ -571,7 +571,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
# otherwise it returns the object
self.assertEqual(obj, portal_catalog.getObject(obj.getUid()).getObject())
# but raises KeyError if object is not in catalog
self.assertRaises(KeyError, portal_catalog.getObject, sys.maxint)
self.assertRaises(KeyError, portal_catalog.getObject, sys.maxsize)
self.assertRaises(KeyError, portal_catalog.getObject, -1)
def test_getRecordForUid(self):
portal_catalog = self.getCatalogTool()
......@@ -583,7 +584,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
portal_catalog = self.getCatalogTool()
obj = self._makeOrganisation()
self.assertEqual(obj.getPath(), portal_catalog.getpath(obj.getUid()))
self.assertRaises(KeyError, portal_catalog.getpath, sys.maxint)
self.assertRaises(KeyError, portal_catalog.getpath, sys.maxsize)
self.assertRaises(KeyError, portal_catalog.getpath, -1)
def test_16_newUid(self):
# newUid should not assign the same uid
......
......@@ -129,7 +129,7 @@ class TestTrashTool(ERP5TypeTestCase):
trashbin_uid = trashbin.getUid()
self.assertNotEqual(trash_uid, None)
self.assertNotEqual(trashbin_uid, None)
self.assertItemsEqual(
self.assertCountEqual(
[
x.path
for x in self.portal.portal_catalog(
......
......@@ -121,7 +121,7 @@ class TestVanillaERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.tic()
# Check if all objects are catalogued as before
self.maxDiff = None
self.assertItemsEqual(original_path_list, self.getSQLPathList())
self.assertCountEqual(original_path_list, self.getSQLPathList())
# Note: this test is only working as a sinde-effect of
# test_1_ERP5Site_reindexAll being run first (it produces a "clean" catalog).
......@@ -172,7 +172,7 @@ class TestVanillaERP5Catalog(ERP5TypeTestCase, LogInterceptor):
original_path_list = self.getSQLPathList(original_connection_id)
new_path_list = self.getSQLPathList(self.new_erp5_sql_connection)
self.maxDiff = None
self.assertItemsEqual(original_path_list, new_path_list)
self.assertCountEqual(original_path_list, new_path_list)
organisation2 = module.newContent(portal_type='Organisation', title="GreatTitle2")
first_deleted_url = organisation2.getRelativeUrl()
self.tic()
......@@ -246,7 +246,7 @@ class TestVanillaERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(portal_catalog.getHotReindexingState(), HOT_REINDEXING_FINISHED_STATE)
# Check Security UID object exist in roles and users
# compare the number object in the catalog
self.assertItemsEqual(
self.assertCountEqual(
self.getSQLPathList(original_connection_id),
self.getSQLPathListWithRolesAndUsers(original_connection_id),
)
......
......@@ -791,7 +791,7 @@ class TestOAuth2(ERP5TypeTestCase):
refresh_token_lifespan = oauth2_client_declaration_value.getRefreshTokenLifespan()
# Sanity check: there must be no valid OAuth2 session for the test user
self.assertItemsEqual(
self.assertCountEqual(
self.__searchOAuth2Session(
select_list=['creation_date', 'title']
).dictionaries(),
......@@ -995,7 +995,7 @@ class TestOAuth2(ERP5TypeTestCase):
portal_path = portal.getPath()
portal_url = portal.absolute_url() + '/'
# Sanity check: there must be no valid OAuth2 session for the test user
self.assertItemsEqual(
self.assertCountEqual(
self.__searchOAuth2Session(
select_list=['creation_date', 'title']
).dictionaries(),
......
......@@ -25,10 +25,7 @@
#
##############################################################################
from __future__ import print_function
from test import pystone
from time import time
pystone.clock = time
from erp5.component.test.testPerformance import TestPerformanceMixin
from Products.ERP5Type.Core.Workflow import ValidationFailed
from Testing import ZopeTestCase
......@@ -72,8 +69,6 @@ class TestWorkflowPerformance(TestPerformanceMixin):
foo.getSimulationState()
end = time()
print("\n%s pystones/second" % pystone.pystones()[1])
message = "\n%s took %.4gs (%s foo(s))" % (self._testMethodName,
end - start, foo_count)
print(message)
......
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
import transaction
from io import FileIO
import os
class FileUpload(file):
class FileUpload(FileIO):
"""Act as an uploaded file.
"""
__allow_access_to_unprotected_subobjects__ = 1
def __init__(self, path, name):
self.filename = name
file.__init__(self, path)
super(FileUpload, self).__init__(path)
self.headers = {}
......
......@@ -28,9 +28,7 @@
from __future__ import print_function
import subprocess
import unittest
from test import pystone
from time import time
pystone.clock = time
from Products.ERP5Type.tests.runUnitTest import ERP5TypeTestLoader
from erp5.component.test.testTradeModelLine import TestTradeModelLineSale
......@@ -58,7 +56,6 @@ class TestSimulationPerformance(TestTradeModelLineSale):
self.test_01_OrderWithSimpleTaxedAndDiscountedLines()
self.__class__._order = self['order'].getRelativeUrl()
self.runAlarms()
print("\n%s pystones/second" % pystone.pystones()[1])
def perf_01_invoiceSimpleOrder(self, order_count=1):
start = time()
......
......@@ -156,7 +156,7 @@ class TestDomainTool(TestPredicateMixIn):
searchPredicateList = self.getDomainTool().searchPredicateList
def assertPredicateItemsMatchingOrderLineEqual(expected, **kw):
self.tic()
self.assertItemsEqual(
self.assertCountEqual(
expected,
searchPredicateList(order_line, test=test, **kw),
)
......@@ -410,7 +410,7 @@ class TestDomainTool(TestPredicateMixIn):
self.assertIn('LEFT JOIN', src)
else:
self.assertNotIn('LEFT JOIN', src)
self.assertItemsEqual(expected, searchPredicateList(**kw))
self.assertCountEqual(expected, searchPredicateList(**kw))
# Check left join mode
assertUsesLeftJoinAndPredicateItemsMatchingOrderLineEqual(True, [supply1_line1])
......
......@@ -202,7 +202,7 @@ class TestGUISecurity(ERP5TypeTestCase):
self.assertEqual(bar_1.getTranslatedValidationStateTitle(), 'Invalidated')
self.assertEqual(bar_2.getTranslatedValidationStateTitle(), 'Validated')
self.tic()
self.assertItemsEqual(
self.assertCountEqual(
portal.portal_catalog(
select_list=['translated_validation_state_title'],
uid=[
......
......@@ -328,7 +328,7 @@ class TestUpgrader(ERP5TypeTestCase):
def stepCheckNoActivitiesCreated(self, sequence=None):
portal_activities = self.getActivityTool()
message_list = portal_activities.getMessageList()
self.assertItemsEqual(['Alarm_runUpgrader', 'notify'],
self.assertCountEqual(['Alarm_runUpgrader', 'notify'],
[x.method_id for x in message_list])
getTitleList = self.getTemplateTool().getInstalledBusinessTemplateTitleList
self.assertNotIn('erp5_web', getTitleList())
......
......@@ -77,7 +77,7 @@ class TestSFTPConnection(ERP5TypeTestCase):
self.connection.putFile("first_file", "first file content ( a bit bigger )")
self.connection.putFile("second_file", "second file content")
# by default, ordering is not specified
self.assertItemsEqual(
self.assertCountEqual(
["first_file", "second_file"],
self.connection.listFiles(".")
)
......@@ -94,9 +94,9 @@ class TestSFTPConnection(ERP5TypeTestCase):
def test_create_remove_directory(self):
self.connection.createDirectory("foo")
self.assertItemsEqual(["foo"], self.connection.listFiles("."))
self.assertCountEqual(["foo"], self.connection.listFiles("."))
self.connection.removeDirectory("foo")
self.assertItemsEqual([], self.connection.listFiles("."))
self.assertCountEqual([], self.connection.listFiles("."))
else:
def test_no_SFTP_URL_in_environ(self):
......
......@@ -1342,7 +1342,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(activity_tool.getMessageList()), 2)
activity_tool.distribute()
# After distribute, duplicate is still present.
self.assertItemsEqual([uid1, uid2],
self.assertCountEqual([uid1, uid2],
[x.uid for x in self.getMessageList(activity)])
activity_tool.tic()
......@@ -1372,7 +1372,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activate('a')
self.commit()
# Both activities are queued
self.assertItemsEqual(
self.assertCountEqual(
getMessageList(),
[
('a', -1),
......@@ -1384,7 +1384,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Note: this specific test implmeentation relies on the absence of
# validation-time deduplication which is not strictly related to
# serialization_tag behaviour.
self.assertItemsEqual(
self.assertCountEqual(
getMessageList(),
[
('a', 0),
......@@ -1396,7 +1396,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activate('b')
self.commit()
# 3rd & 4th activities queued
self.assertItemsEqual(
self.assertCountEqual(
getMessageList(),
[
('a', 0),
......@@ -1407,7 +1407,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
)
activity_tool.distribute()
# 3rd activity does not get validated, 4th is validated
self.assertItemsEqual(
self.assertCountEqual(
getMessageList(),
[
('a', 0),
......@@ -2663,7 +2663,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
does_not_exist = 'baz'
# Family declaration API
self.assertItemsEqual(activity_tool.getFamilyNameList(), [])
self.assertCountEqual(activity_tool.getFamilyNameList(), [])
self.assertRaises(
ValueError,
activity_tool.createFamily, 'same', # Reserved name
......@@ -2694,39 +2694,39 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Silent success
activity_tool.deleteFamily(member)
activity_tool.createFamily(non_member)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
self.assertCountEqual(activity_tool.getFamilyNameList(), [other, non_member])
# API for node a-/di-ssociation with/from families
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [])
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
# Silent success
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
activity_tool.addNodeToFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, non_member])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, non_member])
activity_tool.removeNodeFromFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
# Silent success
activity_tool.removeNodeFromFamily(node_id, non_member)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
activity_tool.createFamily(does_not_exist)
activity_tool.addNodeToFamily(node_id, does_not_exist)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, does_not_exist])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other, does_not_exist])
activity_tool.deleteFamily(does_not_exist)
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertItemsEqual(activity_tool.getFamilyNameList(), [other, non_member])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [other])
self.assertCountEqual(activity_tool.getFamilyNameList(), [other, non_member])
activity_tool.renameFamily(other, member)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
self.assertCountEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
activity_tool.createFamily(other)
activity_tool.addNodeToFamily(node_id, other)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member, other])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member, other])
self.assertCountEqual(activity_tool.getFamilyNameList(), [member, non_member, other])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member, other])
activity_tool.deleteFamily(other)
self.assertItemsEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertItemsEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
self.assertCountEqual(activity_tool.getFamilyNameList(), [member, non_member])
self.assertCountEqual(activity_tool.getCurrentNodeFamilyNameSet(), [member])
o = self.getOrganisation()
for activity in 'SQLDict', 'SQLQueue':
# Sanity check.
......@@ -2765,7 +2765,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
organisation.activate(tag='3', after_tag='foo').getId()
self.commit()
activity_tool.getMessageList()
self.assertItemsEqual(
self.assertCountEqual(
[('1', 0), ('2', 0), ('3', -1)],
[
(x.activity_kw['tag'], x.processing_node)
......@@ -2828,8 +2828,8 @@ return [x.getObject() for x in context.portal_catalog(limit=100)]
self.assertIs(type(aq_base(user)), PropertiedUser)
self.assertEqual(aq_parent(user), aq_parent(artificial_user))
self.assertEqual(user.getId(), artificial_user.getId())
self.assertItemsEqual(user.getGroups(), artificial_user.getGroups())
self.assertItemsEqual(user.getRoles(), artificial_user.getRoles())
self.assertCountEqual(user.getGroups(), artificial_user.getGroups())
self.assertCountEqual(user.getRoles(), artificial_user.getRoles())
Organisation.checkUserGroupAndRole = checkUserGroupAndRole
try:
newSecurityManager(None, artificial_user)
......
......@@ -331,7 +331,7 @@ class TestCMFCategory(ERP5TypeTestCase):
person = module.newContent(portal_type='Person')
address = person.newContent(portal_type='Address')
# Non-strict, implicit base category
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=cat2.getRelativeUrl(),
),
......@@ -340,7 +340,7 @@ class TestCMFCategory(ERP5TypeTestCase):
(cat1.getUid(), basecat.getUid(), 0),
],
)
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=cat22.getRelativeUrl(),
),
......@@ -351,7 +351,7 @@ class TestCMFCategory(ERP5TypeTestCase):
],
)
# Non-canonical path
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=cat2.getRelativeUrl() + '/' + cat3.getId(),
),
......@@ -362,7 +362,7 @@ class TestCMFCategory(ERP5TypeTestCase):
],
)
# Strict, implicit base category
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=cat2.getRelativeUrl(),
strict=True,
......@@ -372,7 +372,7 @@ class TestCMFCategory(ERP5TypeTestCase):
],
)
# Non-strict, explicit base category
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=cat2.getRelativeUrl(),
base_category=basecat2.getId(),
......@@ -384,7 +384,7 @@ class TestCMFCategory(ERP5TypeTestCase):
],
)
# Strict, explicit base category
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=cat2.getRelativeUrl(),
base_category=basecat2.getId(),
......@@ -396,7 +396,7 @@ class TestCMFCategory(ERP5TypeTestCase):
)
# Non-strict with a non-category relation: only strict relation uid.
# Note: not providing base_category is undefined behaviour.
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=person.getRelativeUrl(),
base_category=basecat.getId(),
......@@ -406,7 +406,7 @@ class TestCMFCategory(ERP5TypeTestCase):
],
)
# ... even on a subobject
self.assertItemsEqual(
self.assertCountEqual(
getCategoryParentUidList(
relative_url=address.getRelativeUrl(),
base_category=basecat.getId(),
......@@ -617,18 +617,18 @@ class TestCMFCategory(ERP5TypeTestCase):
pc = self.getCategoriesTool()
bc = pc.newContent(portal_type='Base Category', id='related_value_test')
self.tic()
self.assertItemsEqual(pc.getRelatedValueList(bc), [bc])
self.assertCountEqual(pc.getRelatedValueList(bc), [bc])
c1 = bc.newContent(portal_type='Category', id='1')
self.tic()
self.assertItemsEqual(pc.getRelatedValueList(bc), [bc])
self.assertItemsEqual(pc.getRelatedValueList(c1), [c1])
self.assertCountEqual(pc.getRelatedValueList(bc), [bc])
self.assertCountEqual(pc.getRelatedValueList(c1), [c1])
c11 = c1.newContent(portal_type='Category', id='1')
self.tic()
self.assertItemsEqual(pc.getRelatedValueList(bc), [bc])
self.assertItemsEqual(pc.getRelatedValueList(c1), [c1, c11])
self.assertItemsEqual(pc.getRelatedValueList(c11), [c11])
self.assertCountEqual(pc.getRelatedValueList(bc), [bc])
self.assertCountEqual(pc.getRelatedValueList(c1), [c1, c11])
self.assertCountEqual(pc.getRelatedValueList(c11), [c11])
#test _getDefaultRelatedProperty Accessor
person = self.portal.person_module.newContent(id='person_test')
......@@ -675,7 +675,7 @@ class TestCMFCategory(ERP5TypeTestCase):
# or if we set other categories
europe.setCategoryList(['subordination/person_module'])
self.assertItemsEqual(
self.assertCountEqual(
['region/europe', 'subordination/person_module'],
europe.getCategoryList())
self.assertEqual(
......
......@@ -1304,7 +1304,7 @@ class _TestLocalRoleManagementMixIn(object):
user_id = person_value.getUserId()
getUserById = self.portal.acl_users.getUserById
def assertRoleItemsEqual(expected_role_set):
self.assertItemsEqual(getUserById(user_id).getGroups(), expected_role_set)
self.assertCountEqual(getUserById(user_id).getGroups(), expected_role_set)
# check if assignment change is effective immediately
assertRoleItemsEqual(['F1_G1_S1'])
self.login()
......
......@@ -49,6 +49,9 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import (
)
from glob import glob
import transaction
import six
if not six.PY2:
from importlib import reload
from zLOG import LOG, DEBUG, INFO
......
......@@ -674,15 +674,6 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase, functional.F
self.assertEqual(method(), reference_workflow_state)
return workflow_error_message
# BBB backport methods from python3.
# We use this tricky getattr syntax so that lib2to3.fixers.fix_asserts
# do not fix this code.
if six.PY2:
def assertRaisesRegex(self, *args, **kwargs):
return getattr(self, 'assertRaisesRegexp')(*args, **kwargs)
def assertRegex(self, *args, **kwargs):
return getattr(self, 'assertRegexpMatches')(*args, **kwargs)
def stepPdb(self, sequence=None, sequence_list=None):
"""Invoke debugger"""
try: # try ipython if available
......
......@@ -52,7 +52,8 @@ class ERP5TypeTestSuite(TestSuite):
args = ("--firefox_bin=%s" % firefox_bin,) + args
if xvfb_bin:
args = ("--xvfb_bin=%s" % xvfb_bin,) + args
if 'testUpgradeInstanceWithOldDataFs' in args:
if ('testUpgradeInstanceWithOldDataFs' in args
or 'testUpgradeInstanceWithOldDataFsLegacyWorkflow' in args):
# our reference Data.fs uses `CONNECTION_STRING_REPLACED_BY_TEST_INIT_______________________________`
# as a connection string. Before we start, replace this by the connection string
# that this test node is using.
......@@ -148,7 +149,7 @@ class SavedTestSuite(ERP5TypeTestSuite):
def __init__(self, *args, **kw):
# Use same portal id for all tests run by current instance
# but keep it (per-run) random.
self._portal_id = 'portal_%i' % (random.randint(0, sys.maxint), )
self._portal_id = 'portal_%i' % (random.randint(0, sys.maxsize), )
self._setup_failed = False
super(SavedTestSuite, self).__init__(*args, **kw)
......
......@@ -12,7 +12,15 @@ def patch():
import six
import traceback
from unittest import TextTestResult, TextTestRunner
from unittest import TestCase, TextTestResult, TextTestRunner
# backport methods from python3.
if six.PY2:
# We use this tricky getattr syntax so that lib2to3.fixers.fix_asserts
# do not fix this code.
TestCase.assertRaisesRegex = getattr(TestCase, 'assertRaisesRegexp')
TestCase.assertRegex = getattr(TestCase, 'assertRegexpMatches')
TestCase.assertCountEqual = TestCase.assertItemsEqual
TextTestResult_addError = six.get_unbound_function(TextTestResult.addError)
def addError(self, test, err):
......
......@@ -11,6 +11,7 @@ import signal
import shutil
import errno
import random
import six
import transaction
import warnings
from glob import glob
......@@ -214,14 +215,13 @@ def initializeInstanceHome(tests_framework_home,
else:
os.symlink(src, d)
d = 'custom_zodb.py'
if not os.path.exists(d):
src = os.path.join(tests_framework_home, d)
if os.path.islink(d):
os.remove(d)
if WIN:
shutil.copy(src, d)
else:
os.symlink(src, d)
src = os.path.join(tests_framework_home, d)
if os.path.exists(d):
os.remove(d)
if WIN:
shutil.copy(src, d)
else:
os.symlink(src, d)
finally:
os.chdir(old_pwd)
......@@ -288,6 +288,15 @@ class ERP5TypeTestLoader(unittest.TestLoader):
lambda self: self._testMethodPrefix,
lambda self, value: None)
if six.PY3:
def __init__(self):
# override without call super() to avoid RecursionError in Python 3.
# super().__init__()
self.errors = []
# Tracks packages which we have called into via load_tests, to
# avoid infinite re-entrancy.
self._loading_packages = set()
def _importZodbTestComponent(self, name):
import erp5.component.test
module = __import__('erp5.component.test.' + name,
......
......@@ -33,7 +33,7 @@ import urllib
import httplib
class TestUpgradeInstanceWithOldDataFs(ERP5TypeTestCase):
class TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow(ERP5TypeTestCase):
def getBusinessTemplateList(self):
return ('erp5_core_proxy_field_legacy',
......@@ -155,5 +155,5 @@ class TestUpgradeInstanceWithOldDataFs(ERP5TypeTestCase):
def test_suite():
suite = unittest.TestSuite()
if WITH_LEGACY_WORKFLOW:
suite.addTest(unittest.makeSuite(TestUpgradeInstanceWithOldDataFs))
suite.addTest(unittest.makeSuite(TestUpgradeInstanceWithOldDataFsWithLegacyWorkflow))
return suite
......@@ -57,6 +57,12 @@ import pytz
import six
import lxml.html
if six.PY2:
FileIO = file
else:
from io import FileIO
from importlib import reload
def canonical_html(html):
# type: (str) -> str
......@@ -68,7 +74,7 @@ def canonical_html(html):
).decode('utf-8')
class FileUpload(file):
class FileUpload(FileIO):
"""Act as an uploaded file.
"""
__allow_access_to_unprotected_subobjects__ = 1
......@@ -76,9 +82,10 @@ class FileUpload(file):
if name is None:
name = os.path.basename(path)
self.filename = name
file.__init__(self, path)
FileIO.__init__(self, path)
self.headers = {}
# dummy objects
class DummyMailHostMixin(object):
"""Dummy Mail Host that doesn't really send messages and keep a copy in
......
......@@ -105,7 +105,7 @@ class ERP5(_ERP5):
return self.runUnitTest('--load', '--save', '--with_wendelin_core', full_test)
elif test.startswith('testFunctional'):
return self._updateFunctionalTestResponse(self.runUnitTest(full_test))
elif test == 'testUpgradeInstanceWithOldDataFs':
elif test.startswith('testUpgradeInstanceWithOldDataFs'):
old_data_path = None
for path in sys.path:
if path.endswith('/erp5-bin'):
......@@ -151,13 +151,13 @@ class ERP5(_ERP5):
"""
# Parse relevant information to update response information
try:
summary, html_test_result = status_dict['stderr'].split("-"*79)[1:3]
summary, html_test_result = status_dict['stderr'].split(b"-"*79)[1:3]
except ValueError:
# In case of error when parse the file, preserve the original
# information. This prevents we have unfinished tests.
return status_dict
status_dict['html_test_result'] = html_test_result
search = self.FTEST_PASS_FAIL_RE.search(summary)
search = self.FTEST_PASS_FAIL_RE.search(summary.decode())
if search:
group_dict = search.groupdict()
status_dict['failure_count'] = int(group_dict['failures']) \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment