# -*- coding: utf-8 -*- ############################################################################## # # Copyright (c) 2004, 2005, 2006 Nexedi SARL and Contributors. # All Rights Reserved. # Sebastien Robin <seb@nexedi.com> # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsability of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # garantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import os import threading import time import unittest import urllib import transaction from DateTime import DateTime from Testing import ZopeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.utils import createZODBPythonScript class TestInvalidationBug(ERP5TypeTestCase): def getTitle(self): return "Invalidation Bug" def getBusinessTemplateList(self): """ """ return ('erp5_base',) def afterSetUp(self): self.login() def testCommitOrder(self): """Check order of resources being committed""" module = self.portal.organisation_module organisation = module.newContent() # modify ZODB and create activity organisation.immediateReindexObject() # modify catalog path = organisation.getPath() test_list = [] for connection_id, table in (('erp5_sql_connection', 'catalog'), ('cmf_activity_sql_connection', 'message')): conn_class = self.portal[connection_id].__class__ conn_string = self.portal[connection_id].connection_string connection = conn_class('_' + connection_id, '', '-' + conn_string).__of__(self.portal) query = "rollback\0select * from %s where path='%s'" % (table, path) test_list.append(lambda manage_test=connection.manage_test, query=query: len(manage_test(query))) result_list = [map(apply, test_list)] Transaction_commitResources = transaction.Transaction._commitResources connection = module._p_jar def _commitResources(self): def tpc_finish(rm, txn): rm.__class__.tpc_finish(rm, txn) result_list.append(None if rm is connection else map(apply, test_list)) try: for rm in self._resources: rm.tpc_finish = lambda txn, rm=rm: tpc_finish(rm, txn) return Transaction_commitResources(self) finally: for rm in self._resources: del rm.tpc_finish try: transaction.Transaction._commitResources = _commitResources self.commit() finally: transaction.Transaction._commitResources = Transaction_commitResources self.tic() # Whether ZODB should be committed before or after catalog is not obvious. # Current behaviour is required to avoid creating duplicated applied rules. self.assertEqual(result_list[0], [0,0]) self.assertEqual(result_list[1], [0,0]) # activity buffer first self.assertEqual(result_list[-3], [1,0]) # catalog self.assertEqual(result_list[-2], None) # ZODB self.assertEqual(result_list[-1], [1,1]) # activity tables last def testLateInvalidationFromZEO(self): ### Check unit test is run properly from ZEO.ClientStorage import ClientStorage storage = self.portal._p_jar._storage activity_tool = self.portal.portal_activities node_list = list(activity_tool.getProcessingNodeList()) node_list.remove(activity_tool.getCurrentNode()) assert node_list and isinstance(storage, ClientStorage), \ "this unit test must be run with at least 2 ZEO clients" ### Prepare unit test, to minimize amount of work during critical section ## url to create some content using another zope new_content_url = "http://ERP5TypeTestCase:@%s%s/Folder_create" % ( node_list[0], self.portal.organisation_module.getPath()) ## prepare freeze/unfreeze of ZEO storage zeo_connection = storage._connection socket_map = zeo_connection._map freeze_lock = threading.Lock() freeze_lock.acquire() def unfreezeStorage(): socket_map[zeo_connection.fileno()] = zeo_connection # wake up asyncore loop to take the new socket into account zeo_connection.trigger.pull_trigger() # link to ZEO will be unfrozen 1 second after we read 'message' table unfreeze_timer = threading.Timer(1, unfreezeStorage) unfreeze_timer.setDaemon(True) ## prepare monkey-patches (with code to revert them) from Products.CMFActivity.Activity.SQLDict import SQLDict zeo_server = storage._server def unpatch(): storage._server = zeo_server SQLDict.getProcessableMessageList = SQLDict_getProcessableMessageList SQLDict_getProcessableMessageList = SQLDict.getProcessableMessageList def getProcessableMessageList(*args, **kw): result = SQLDict_getProcessableMessageList(*args, **kw) unpatch() unfreeze_timer.start() return result ### Perform unit test ## we should start without any pending activity self.assertNoPendingMessage() ## monkey-patch ... SQLDict.getProcessableMessageList = getProcessableMessageList try: # prevent nodes from processing activities automatically activity_tool.manage_removeFromProcessingList(node_list) self.commit() del socket_map[zeo_connection.fileno()] try: # wake up asyncore loop and wait we really woke up zeo_connection.trigger.pull_trigger(freeze_lock.release) freeze_lock.acquire() # make sure ZODB is not accessed until we get a message to process storage._server = None # ... monkey-patch done ## create object urllib.urlopen(new_content_url).read() ## validate reindex activity activity_tool.distribute() self.assertEqual(1, len(activity_tool.getMessageList())) ## reindex created object activity_tool.tic() finally: try: unfreeze_timer.join() except RuntimeError: unfreezeStorage() finally: unpatch() activity_tool.manage_addToProcessingList(node_list) self.commit() ## When the bug is not fixed, we get a -3 failed activity self.assertNoPendingMessage() def _testReindex(self): print("To reproduce bugs easily, distribution step should be skipped for" " SQLDict, by writing messages with processing_node already at 0." " This can be done easily by patching SQLDict_writeMessageList.") module = self.getPortalObject().organisation_module module.newContent() module.setIdGenerator('_generatePerDayId') #module.migrateToHBTree() self.tic() print 'OID(%s) = %r' % (module.getRelativeUrl(), module._p_oid) print ' OID(_tree) = %r' % module._tree._p_oid previous = DateTime() skin_folder = self.getPortal().portal_skins.custom if 'create_script' in skin_folder.objectIds(): skin_folder.manage_delObjects(ids=['create_script']) skin = createZODBPythonScript(skin_folder, 'create_script', '**kw', """ from Products.ERP5Type.Log import log id_list = [] for x in xrange(0, 1): organisation = context.newContent() id_list.append(organisation.getId()) log('Created Organisations', (context,id_list)) #log('All organisations', (context,[x for x in context.objectIds()])) context.activate(activity='SQLQueue', priority=2).create_script() count = len(context) log('Organisation #', count) if (count % 500) < 5: start = context.getProperty('perf_start') if start is None: context.setProperty('perf_start', (count, DateTime())) else: log('creation speed: %s obj/s' % ((count - start[0]) / (86400 * (DateTime() - start[1])))) """) for x in xrange(0,200): module.activate(activity='SQLQueue', priority=2).create_script() self.tic() def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestInvalidationBug)) return suite