testInvalidationBug.py 8.87 KB
Newer Older
1 2 3
# -*- coding: utf-8 -*-
##############################################################################
#
4
# Copyright (c) 2004, 2005, 2006 Nexedi SARL and Contributors.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

31 32 33
import threading
import unittest
import urllib
34
import transaction
35
import pkg_resources
36
from DateTime import DateTime
37 38 39
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript

40 41 42 43
ZEO5 = pkg_resources.parse_version(
  pkg_resources.get_distribution('ZEO').version
) >= pkg_resources.parse_version('5')

44 45 46 47 48 49 50 51 52 53 54 55 56
class TestInvalidationBug(ERP5TypeTestCase):

  def getTitle(self):
    return "Invalidation Bug"

  def getBusinessTemplateList(self):
    """
    """
    return ('erp5_base',)

  def afterSetUp(self):
    self.login()

57 58
  def testCommitOrder(self):
    """Check order of resources being committed"""
59
    module = self.portal.organisation_module
60 61 62 63 64 65
    organisation = module.newContent()    # modify ZODB and create activity
    organisation.immediateReindexObject() # modify catalog
    path = organisation.getPath()
    test_list = []
    for connection_id, table in (('erp5_sql_connection', 'catalog'),
                                 ('cmf_activity_sql_connection', 'message')):
66 67 68 69
      connection = self.portal[connection_id]
      query = connection.factory()('-' + connection.connection_string).query
      sql = "rollback\0select * from %s where path='%s'" % (table, path)
      test_list.append(lambda query=query, sql=sql: len(query(sql)[1]))
70 71
    result_list = [map(apply, test_list)]
    Transaction_commitResources = transaction.Transaction._commitResources
72
    connection = module._p_jar
73
    def _commitResources(self):
74 75 76
      def tpc_finish(rm, txn):
        rm.__class__.tpc_finish(rm, txn)
        result_list.append(None if rm is connection else map(apply, test_list))
77
      try:
78 79
        for rm in self._resources:
          rm.tpc_finish = lambda txn, rm=rm: tpc_finish(rm, txn)
80 81
        return Transaction_commitResources(self)
      finally:
82 83
        for rm in self._resources:
          del rm.tpc_finish
84 85
    try:
      transaction.Transaction._commitResources = _commitResources
86
      self.commit()
87 88
    finally:
      transaction.Transaction._commitResources = Transaction_commitResources
89
    self.tic()
90 91
    # Whether ZODB should be committed before or after catalog is not obvious.
    # Current behaviour is required to avoid creating duplicated applied rules.
92 93
    self.assertEqual(result_list[0], [0,0])
    self.assertEqual(result_list[1], [0,0])  # activity buffer first
94 95
    self.assertEqual(result_list[-3], [1,0]) # catalog
    self.assertEqual(result_list[-2], None)  # ZODB
96 97 98 99 100
    result_catalog_count, result_activity_count = result_list[-1]
    # activity tables last (there may be multiple activities, but there must be
    # at least one).
    self.assertEqual(result_catalog_count, 1)
    self.assertGreaterEqual(result_activity_count, 1)
101

102
  @unittest.skipIf(ZEO5, "Covered upstream on ZEO>=5")
103 104 105 106
  def testLateInvalidationFromZEO(self):
    ### Check unit test is run properly
    from ZEO.ClientStorage import ClientStorage
    storage = self.portal._p_jar._storage
107 108 109 110 111
    self.assertIsInstance(
        storage,
        ClientStorage,
        "This test must be run with ZEO storage")
    node_list = self.getOtherZopeNodeList()
112 113 114
    activity_tool = self.portal.portal_activities

    ### Prepare unit test, to minimize amount of work during critical section
115 116
    ## make sure activity tool's OOBTree for family mapping is loaded before the test
    _ = activity_tool.getCurrentNodeFamilyIdSet()
117
    ## url to create some content using another zope
118
    new_content_url = "http://ERP5TypeTestCase:@%s%s/Folder_create" % (
119 120 121
      node_list[0], self.portal.organisation_module.getPath())
    ## prepare freeze/unfreeze of ZEO storage
    zeo_connection = storage._connection
122
    socket_map = zeo_connection._map
123 124 125 126 127
    freeze_lock = threading.Lock()
    freeze_lock.acquire()
    def unfreezeStorage():
      socket_map[zeo_connection.fileno()] = zeo_connection
      # wake up asyncore loop to take the new socket into account
128
      zeo_connection.trigger.pull_trigger()
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
    # link to ZEO will be unfrozen 1 second after we read 'message' table
    unfreeze_timer = threading.Timer(1, unfreezeStorage)
    unfreeze_timer.setDaemon(True)
    ## prepare monkey-patches (with code to revert them)
    from Products.CMFActivity.Activity.SQLDict import SQLDict
    zeo_server = storage._server
    def unpatch():
      storage._server = zeo_server
      SQLDict.getProcessableMessageList = SQLDict_getProcessableMessageList
    SQLDict_getProcessableMessageList = SQLDict.getProcessableMessageList
    def getProcessableMessageList(*args, **kw):
      result = SQLDict_getProcessableMessageList(*args, **kw)
      unpatch()
      unfreeze_timer.start()
      return result

    ### Perform unit test
    ## we should start without any pending activity
    self.assertNoPendingMessage()
    ## monkey-patch ...
    SQLDict.getProcessableMessageList = getProcessableMessageList
    try:
      # prevent nodes from processing activities automatically
      activity_tool.manage_removeFromProcessingList(node_list)
153
      self.commit()
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
      del socket_map[zeo_connection.fileno()]
      try:
        # wake up asyncore loop and wait we really woke up
        zeo_connection.trigger.pull_trigger(freeze_lock.release)
        freeze_lock.acquire()
        # make sure ZODB is not accessed until we get a message to process
        storage._server = None
        # ... monkey-patch done
        ## create object
        urllib.urlopen(new_content_url).read()
        ## validate reindex activity
        activity_tool.distribute()
        self.assertEqual(1, len(activity_tool.getMessageList()))
        ## reindex created object
        activity_tool.tic()
      finally:
        try:
          unfreeze_timer.join()
        except RuntimeError:
          unfreezeStorage()
    finally:
      unpatch()
      activity_tool.manage_addToProcessingList(node_list)
177
      self.commit()
178
    ## When the bug is not fixed, we get a failed activity
179 180
    ## which would cause tic to fail
    self.tic()
181

182
  def _testReindex(self):
183 184 185
    print("To reproduce bugs easily, distribution step should be skipped for"
          " SQLDict, by writing messages with processing_node already at 0."
          " This can be done easily by patching SQLDict_writeMessageList.")
186
    module = self.getPortalObject().organisation_module
187
    module.newContent()
188
    module.setIdGenerator('_generatePerDayId')
189
    #module.migrateToHBTree()
190
    self.tic()
191 192
    print('OID(%s) = %r' % (module.getRelativeUrl(), module._p_oid))
    print('  OID(_tree) = %r' % module._tree._p_oid)
193 194 195 196 197 198
    previous = DateTime()
    skin_folder = self.getPortal().portal_skins.custom
    if 'create_script' in skin_folder.objectIds():
      skin_folder.manage_delObjects(ids=['create_script'])
    skin = createZODBPythonScript(skin_folder, 'create_script', '**kw',
        """
199
from erp5.component.module.Log import log
200 201 202 203 204 205
id_list = []
for x in xrange(0, 1):
  organisation = context.newContent()
  id_list.append(organisation.getId())
log('Created Organisations', (context,id_list))
#log('All organisations', (context,[x for x in context.objectIds()]))
206
context.activate(activity='SQLQueue', priority=2).create_script()
207 208 209

count = len(context)
log('Organisation #', count)
210 211 212 213
if (count % 500) < 5:
  start = context.getProperty('perf_start')
  if start is None:
    context.setProperty('perf_start', (count, DateTime()))
214
  else:
215 216
    log('creation speed: %s obj/s' % ((count - start[0]) /
        (86400 * (DateTime() - start[1]))))
217 218
""")
    for x in xrange(0,200):
219
      module.activate(activity='SQLQueue', priority=2).create_script()
220 221 222 223 224 225
    self.tic()

def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestInvalidationBug))
  return suite