From 0cb50e14aa33e06d457e4449606e9c438a5cb42e Mon Sep 17 00:00:00 2001
From: Julien Muchembled <jm@nexedi.com>
Date: Wed, 27 Apr 2011 15:05:43 +0000
Subject: [PATCH] Review setup of activity load balancing in ZEO-based unit
 tests

Do not initialize '/test_processing_nodes' from ZEO server anymore, because this
would not work for NEO. As a result, conflict resolution on Application is
implemented and cleanup of list of activity nodes is done at shutdown.

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@45694 20353a03-c40f-0410-a6d1-a30d3c3de9de
---
 .../ERP5Type/tests/ProcessingNodeTestCase.py  | 56 +++++++++++++++++--
 product/ERP5Type/tests/runUnitTest.py         | 13 +----
 2 files changed, 53 insertions(+), 16 deletions(-)

diff --git a/product/ERP5Type/tests/ProcessingNodeTestCase.py b/product/ERP5Type/tests/ProcessingNodeTestCase.py
index 180ddae4a4..1b0f2e8f4c 100644
--- a/product/ERP5Type/tests/ProcessingNodeTestCase.py
+++ b/product/ERP5Type/tests/ProcessingNodeTestCase.py
@@ -1,16 +1,36 @@
 # -*- coding: utf-8 -*-
 import base64, errno, os, select, socket, sys, time
 from threading import Thread
+from UserDict import IterableUserDict
 import Lifetime
 import transaction
-from BTrees.OIBTree import OIBTree
 from Testing import ZopeTestCase
+from ZODB.POSException import ConflictError
 from zLOG import LOG, ERROR
 from Products.CMFActivity.Activity.Queue import VALIDATION_ERROR_DELAY
 from Products.ERP5Type.tests import backportUnittest
 from Products.ERP5Type.tests.utils import createZServer
 
 
+class DictPersistentWrapper(IterableUserDict, object):
+
+  def __metaclass__(name, base, d):
+    def wrap(attr):
+      wrapped = getattr(base[0], attr)
+      def wrapper(self, *args, **kw):
+        self._persistent_object._p_changed = 1
+        return wrapped(self, *args, **kw)
+      wrapper.__name__ = attr
+      return wrapper
+    for attr in ('clear', 'setdefault', 'update', '__setitem__', '__delitem__'):
+      d[attr] = wrap(attr)
+    return type(name, base, d)
+
+  def __init__(self, dict, persistent_object):
+    self.data = dict
+    self._persistent_object = persistent_object
+
+
 def patchActivityTool():
   """Redefine several methods of ActivityTool for unit tests
   """
@@ -39,8 +59,8 @@ def patchActivityTool():
   def getNodeDict(self):
     app = self.getPhysicalRoot()
     if getattr(app, 'test_processing_nodes', None) is None:
-      app.test_processing_nodes = OIBTree()
-    return app.test_processing_nodes
+      app.test_processing_nodes = {}
+    return DictPersistentWrapper(app.test_processing_nodes, app)
 
   @patch
   def getDistributingNode(self):
@@ -74,6 +94,25 @@ def patchActivityTool():
       self._orig_tic(processing_node, force)
 
 
+def Application_resolveConflict(self, old_state, saved_state, new_state):
+  """Solve conflicts in case several nodes register at the same time
+  """
+  new_state = new_state.copy()
+  old, saved, new = [set(state.pop('test_processing_nodes', {}).items())
+                     for state in old_state, saved_state, new_state]
+  if sorted(old_state.items()) != sorted(saved_state.items()):
+    raise ConflictError
+  new |= saved - old
+  new -= old - saved
+  new_state['test_processing_nodes'] = nodes = dict(new)
+  if len(nodes) != len(new):
+    raise ConflictError
+  return new_state
+
+from OFS.Application import Application
+Application._p_resolveConflict = Application_resolveConflict
+
+
 class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
   """Minimal ERP5 TestCase class to process activities
 
@@ -131,7 +170,16 @@ class ProcessingNodeTestCase(backportUnittest.TestCase, ZopeTestCase.TestCase):
     if processing:
       activity_tool.manage_addToProcessingList((currentNode,))
     else:
-      activity_tool.manage_removeFromProcessingList((currentNode,))
+      activity_tool.manage_delNode((currentNode,))
+
+  @classmethod
+  def unregisterNode(cls):
+    if ZopeTestCase.utils._Z2HOST is not None:
+      self = cls('unregisterNode')
+      self.app = self._app()
+      self._registerNode(distributing=0, processing=0)
+      transaction.commit()
+      self._close()
 
   def assertNoPendingMessage(self):
     """Get the last error message from error_log"""
diff --git a/product/ERP5Type/tests/runUnitTest.py b/product/ERP5Type/tests/runUnitTest.py
index 25897b29b3..8396b4effa 100755
--- a/product/ERP5Type/tests/runUnitTest.py
+++ b/product/ERP5Type/tests/runUnitTest.py
@@ -538,18 +538,6 @@ def runUnitTestList(test_list, verbosity=1, debug=0, run_only=None):
       if run_only:
         ERP5TypeTestLoader.filter_test_list = None
 
-    if not isinstance(Storage, ClientStorage):
-      # Remove nodes that were registered during previous execution.
-      # Set an empty dict (instead of delete the property)
-      # in order to avoid conflicts on / when several ZEO clients registers.
-      from BTrees.OIBTree import OIBTree
-      app = ZopeTestCase.app()
-      app.test_processing_nodes = OIBTree()
-      import transaction
-      transaction.commit()
-      ZopeTestCase.close(app)
-      del app
-
     if zeo_client_pid_list is None:
       result = suite()
     else:
@@ -558,6 +546,7 @@ def runUnitTestList(test_list, verbosity=1, debug=0, run_only=None):
       _print('done (%.3fs)\n' % (time.time() - _start))
       result = TestRunner(verbosity=verbosity).run(suite)
   finally:
+    ProcessingNodeTestCase.unregisterNode()
     Storage.close()
     if zeo_client_pid_list is not None:
       # Wait that child processes exit. Stop ZEO storage (if any) after all
-- 
2.30.9