Commit 5a5cc985 authored by Hardik Juneja's avatar Hardik Juneja

fix erp5 type live testing suite

parent 5cf1ee58
......@@ -32,6 +32,7 @@ import sys
import imp
import re
import thread
import weakref
from Testing import ZopeTestCase
from Testing.ZopeTestCase import PortalTestCase, user_name
......@@ -42,6 +43,7 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import \
ERP5TypeTestCaseMixin, ERP5TypeTestCase
from glob import glob
import transaction
from functools import wraps
from zLOG import LOG, DEBUG, INFO
......@@ -49,7 +51,80 @@ from zLOG import LOG, DEBUG, INFO
# Tic doesn't need help as TimerService is running
from Products.ERP5Type.tests import ProcessingNodeTestCase as\
ProcessingNodeTestCaseModule
ProcessingNodeTestCaseModule.patchActivityTool = lambda: None
class Patch(object):
"""
Patch attributes and revert later automatically.
Usage:
with Patch(someObject, attrToPatch=newValue, [otherAttr=...]) as patch:
[... code that runs with patches ...]
[... code that runs without patch ...]
' as patch' is optional: 'patch.revert()' can be used to revert patches
in the middle of the 'with' clause.
"""
applied = False
def __new__(cls, patched, **patch):
if patch:
return object.__new__(cls)
def patch(func):
self = cls(patched, **{func.__name__: func})
self.apply()
return self
return patch
def __init__(self, patched, **patch):
(name, patch), = patch.iteritems()
self._patched = patched
self._name = name
if callable(patch):
wrapped = getattr(patched, name, None)
func = patch
patch = lambda *args, **kw: func(wrapped, *args, **kw)
if callable(wrapped):
patch = wraps(wrapped)(patch)
self._patch = patch
try:
orig = patched.__dict__[name]
self._revert = lambda: setattr(patched, name, orig)
except KeyError:
self._revert = lambda: delattr(patched, name)
def apply(self):
assert not self.applied
setattr(self._patched, self._name, self._patch)
self.applied = True
def revert(self):
del self.applied
self._revert()
def __del__(self):
if self.applied:
self.revert()
def __enter__(self):
self.apply()
return weakref.proxy(self)
def __exit__(self, t, v, tb):
self.__del__()
from Products.CMFActivity.ActivityTool import ActivityTool
def tic(self, orig, processing_node=1, force=0):
#import pdb; pdb.set_trace();
currentNode = self.getCurrentNode()
processing_node_list = self.getProcessingNodeList()
if currentNode in processing_node_list:
orig(processing_node, force)
else:
# Sleep between each distribute.
time.sleep(0.3)
transaction.begin()
_request_server_url = None
......@@ -239,7 +314,11 @@ def runLiveTest(test_list, verbosity=1, stream=None, request_server_url=None, **
# original SecurityManager is restored
from AccessControl.SecurityManagement import getSecurityManager, setSecurityManager
sm = getSecurityManager()
try:
result = TestRunner(stream=output, verbosity=verbosity).run(suite)
finally:
setSecurityManager(sm)
with Patch(ActivityTool, tic=tic) as patch:
try:
# patch
result = TestRunner(stream=output, verbosity=verbosity).run(suite)
finally:
setSecurityManager(sm)
# unpatch
patch.revert()
......@@ -222,6 +222,7 @@ class ProcessingNodeTestCase(ZopeTestCase.TestCase):
getMessageList = portal_activities.getMessageList
message_list = getMessageList()
message_count = len(message_list)
import pdb; pdb.set_trace()
while message_count and not stop_condition(message_list):
if verbose and old_message_count != message_count:
ZopeTestCase._print(' %i' % message_count)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment