testIdToolUpgrade.py 12.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2008 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

30 31
import unittest

32 33
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.Globals import PersistentMapping
34
from Products.ERP5Type.Utils import ScalarMaxConflictResolver
35
from BTrees.Length import Length
36
from BTrees.OOBTree import OOBTree
37 38
from zLOG import LOG

39 40

class TestIdToolUpgrade(ERP5TypeTestCase):
41 42 43 44 45 46 47 48 49 50 51 52 53
  """
  Automatic upgrade of id tool is really sensible to any change. Therefore,
  make sure that the upgrade is still working even if there changes.

  specific test is used, because here some really nasty things are done
  """

  def getTitle(self):
    """
      Return the title of test
    """
    return "Test Id Tool Upgrade"

54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
  def afterSetUp(self):
    self.login()
    self.id_tool = self.portal.portal_ids
    self.id_tool.initializeGenerator(all=True)
    self.createGenerators()
    self.tic()

  def beforeTearDown(self):
    self.id_tool.clearGenerator(all=True)

  def createGenerators(self):
    """
      Initialize some generators for the tests
    """
    self.application_sql_generator = self.id_tool.newContent(\
                            portal_type='Application Id Generator',
                            reference='test_application_sql',
                            version='001')
    self.conceptual_sql_generator = self.id_tool.newContent(\
                           portal_type='Conceptual Id Generator',
                           reference='test_non_continuous_increasing',
                           version='001')
    self.sql_generator = self.id_tool.newContent(\
                    portal_type='SQL Non Continuous Increasing Id Generator',
                    reference='test_sql_non_continuous_increasing',
                    version='001')
    self.application_sql_generator.setSpecialiseValue(\
                                   self.conceptual_sql_generator)
    self.conceptual_sql_generator.setSpecialiseValue(self.sql_generator)

    self.application_zodb_generator = self.id_tool.newContent(\
                            portal_type='Application Id Generator',
                            reference='test_application_zodb',
                            version='001')
    self.conceptual_zodb_generator = self.id_tool.newContent(\
                           portal_type='Conceptual Id Generator',
                           reference='test_continuous_increasing',
                           version='001')
    self.zodb_generator = self.id_tool.newContent(\
                    portal_type='ZODB Continuous Increasing Id Generator',
                    reference='test_zodb_continuous_increasing',
                    version='001')
    self.application_zodb_generator.setSpecialiseValue(\
                                    self.conceptual_zodb_generator)
    self.conceptual_zodb_generator.setSpecialiseValue(self.zodb_generator)

  def getLastGenerator(self, id_generator):
    """
      Return Last Id Generator
    """
    document_generator = self.id_tool.searchFolder(reference=id_generator)[0]
    application_generator = document_generator.getLatestVersionValue()
    conceptual_generator = application_generator.getSpecialiseValue()\
                           .getLatestVersionValue()
    last_generator = conceptual_generator.getSpecialiseValue()\
                     .getLatestVersionValue()
    return last_generator

112
  def testUpgradeIdToolDicts(self):
113 114
    # With old erp5_core, we have no generators, no IdTool_* zsql methods,
    # and we have a dictionary stored on id tool
115
    id_tool = self.portal.portal_ids
116
    # Rebuild a persistent mapping like it already existed in beginning 2010
117
    # First persistent mapping of generateNewLengthIdList
118 119
    id_tool.dict_length_ids = PersistentMapping()
    id_tool.dict_length_ids['foo'] = Length(5)
120
    id_tool.dict_length_ids['bar'] = Length(5)
121
    id_tool.IdTool_zSetLastId(id_group='foo', last_id=5)
122
    id_tool.IdTool_zSetLastId(id_group='bar', last_id=10)
123 124 125 126 127 128
    # Then persistent mapping of generateNewId
    id_tool.dict_ids = PersistentMapping()
    id_tool.dict_ids['foo'] = 3
    # it was unfortunately possible to define something else
    # than strings
    id_tool.dict_ids[('bar','baz')] = 2
129 130 131 132 133
    # Delete portal type info and new generators
    id_tool.manage_delObjects(ids=list(id_tool.objectIds()))
    id_tool.__class__.getTypeInfo = lambda self: None
    # Test with compatibility
    self.tic()
134
    id_list = id_tool.generateNewLengthIdList(id_group='foo', store=1)
135 136
    self.assertEqual(id_list, [5])
    self.assertEqual(int(id_tool.dict_length_ids['foo'].value), 6)
137 138 139 140 141 142 143 144
    # Now, restore and make sure we can still generate ids
    del id_tool.__class__.getTypeInfo
    bt = self.portal.portal_templates.getInstalledBusinessTemplate('erp5_core',
                                                                  strict=True)
    for path, obj in bt._path_item._objects.iteritems():
        path, obj_id = path.rsplit('/', 1)
        if path == 'portal_ids':
            id_tool._setObject(obj_id, obj._getCopy(bt))
145 146
    self.tic()
    id_list = id_tool.generateNewLengthIdList(id_group='foo')
147
    # it is known that with current upgrade there is a hole
148
    self.assertEqual(id_list, [7])
149
    new_id = id_tool.generateNewId(id_group='foo')
150
    self.assertEqual(new_id, 4)
151
    new_id = id_tool.generateNewId(id_group=('bar','baz'))
152
    self.assertEqual(new_id, 3)
153
    # Make sure that the old code is not used any more, so the dic on
154
    # id tool should not change, checking for length_dict
155
    self.assertEqual(int(id_tool.dict_length_ids['foo'].value), 6)
156
    id_list = id_tool.generateNewLengthIdList(id_group='bar')
157
    self.assertEqual(id_list, [11])
158 159
    generator_list = [x for x in id_tool.objectValues()
                      if x.getReference()=='mysql_non_continuous_increasing']
160
    self.assertEqual(len(generator_list), 1)
161
    generator = generator_list[0]
162 163
    self.assertEqual(generator.last_max_id_dict['foo'].value, 7)
    self.assertEqual(generator.last_max_id_dict['bar'].value, 11)
164 165
    # Make sure that the old code is not used any more, so the dic on
    # id tool should not change, checking for dict
166
    self.assertEqual(id_tool.dict_ids['foo'], 3)
167 168
    generator_list = [x for x in id_tool.objectValues()
                      if x.getReference()=='zodb_continuous_increasing']
169
    self.assertEqual(len(generator_list), 1)
170
    generator = generator_list[0]
171 172
    self.assertEqual(generator.last_id_dict['foo'], 4)
    self.assertEqual(generator.last_id_dict["('bar', 'baz')"], 3)
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215


  def _setUpLastMaxIdDict(self, id_generator_reference):
    def countup(id_generator, id_group, until):
      for i in xrange(until + 1):
        self.id_tool.generateNewId(id_generator=id_generator_reference,
                                   id_group=id_group)

    countup(id_generator_reference, 'A-01', 2)
    countup(id_generator_reference, 'B-01', 1)
    var_id = 'C-%04d'
    for x in xrange(self.a_lot_of_key):
      countup(id_generator_reference, var_id % x, 0)

  def _getLastIdDictName(self, id_generator):
    portal_type = id_generator.getPortalType()
    if portal_type == 'SQL Non Continuous Increasing Id Generator':
      return 'last_max_id_dict'
    elif portal_type == 'ZODB Continuous Increasing Id Generator':
      return 'last_id_dict'
    else:
      raise RuntimeError("not expected to test the generator :%s" % portal_type)

  def _getLastIdDict(self, id_generator):
    last_id_dict_name = self._getLastIdDictName(id_generator)
    return getattr(id_generator, last_id_dict_name)

  def _setLastIdDict(self, id_generator, value):
    last_id_dict_name = self._getLastIdDictName(id_generator)
    setattr(id_generator, last_id_dict_name, value)

  def _getValueFromLastIdDict(self, last_id_dict, key):
    value = last_id_dict[key]
    if isinstance(value, int):
      # in ZODB Id Generator it is stored in int
      return value
    elif isinstance(value, ScalarMaxConflictResolver):
      return value.value
    else:
      raise RuntimeError('not expected to test the value: %s' % value)

  def _assertIdGeneratorLastMaxIdDict(self, id_generator):
    last_id_dict = self._getLastIdDict(id_generator)
216 217
    self.assertEqual(2, self._getValueFromLastIdDict(last_id_dict, 'A-01'))
    self.assertEqual(1, self._getValueFromLastIdDict(last_id_dict, 'B-01'))
218 219
    for x in xrange(self.a_lot_of_key):
      key = 'C-%04d' % x
220
      self.assertEqual(0, self._getValueFromLastIdDict(last_id_dict, key))
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243

    # 1(A-01) + 1(B-01) + a_lot_of_key(C-*)
    number_of_group_id = self.a_lot_of_key + 2
    self.assertEqual(number_of_group_id,
                     len(id_generator.exportGeneratorIdDict()))
    self.assertEqual(number_of_group_id, len(last_id_dict))


  def _checkDataStructureMigration(self, id_generator):
    """ First, simulate previous data structure which is using
    PersisntentMapping as the storage, then migrate to OOBTree.
    Then, migrate the id generator again from OOBTree to OOBtree
    just to be sure."""
    id_generator_reference  = id_generator.getReference()

    reference_portal_type_dict = {
      'test_sql_non_continuous_increasing':'SQL Non Continuous ' \
                                           'Increasing Id Generator',
      'test_zodb_continuous_increasing':'ZODB Continuous ' \
                                        'Increasing Id Generator'
    }
    try:
      portal_type = reference_portal_type_dict[id_generator_reference]
244
      self.assertEqual(id_generator.getPortalType(), portal_type)
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
    except:
      raise ValueError("reference is not valid: %s" % id_generator_reference)

    self._setLastIdDict(id_generator, PersistentMapping()) # simulate previous
    last_id_dict = self._getLastIdDict(id_generator)

    # setUp the data for migration test
    self._setUpLastMaxIdDict(id_generator_reference)

    # test migration: PersistentMapping to OOBTree
    self.assertTrue(isinstance(last_id_dict, PersistentMapping))
    self._assertIdGeneratorLastMaxIdDict(id_generator)
    id_generator.rebuildGeneratorIdDict() # migrate the dict
    self._assertIdGeneratorLastMaxIdDict(id_generator)

    # test migration: OOBTree to OOBTree. this changes nothing, just to be sure
    last_id_dict = self._getLastIdDict(id_generator)
    self.assertTrue(isinstance(last_id_dict, OOBTree))
    self._assertIdGeneratorLastMaxIdDict(id_generator)
    id_generator.rebuildGeneratorIdDict() # migrate the dict
    self._assertIdGeneratorLastMaxIdDict(id_generator)

    # test migration: SQL to OOBTree
    if id_generator.getPortalType() == \
      'SQL Non Continuous Increasing Id Generator':
      self._setLastIdDict(id_generator, OOBTree()) # set empty one
      last_id_dict = self._getLastIdDict(id_generator)
      assert(len(last_id_dict), 0) # 0 because it is empty
      self.assertTrue(isinstance(last_id_dict, OOBTree))
      # migrate the dict totally from sql table in this case
      id_generator.rebuildGeneratorIdDict()
      self._assertIdGeneratorLastMaxIdDict(id_generator)


  def testRebuildIdDictFromPersistentMappingToOOBTree(self):
    """
      Check migration is working
    """
    # this is the amount of keys that is creating in this test
    self.a_lot_of_key = 1010
    # check sql id generator migration
    id_generator_reference = 'test_application_sql'
    id_generator = self.getLastGenerator(id_generator_reference)
    id_generator.setStoredInZodb(True)
    id_generator.clearGenerator() # clear stored data
    self._checkDataStructureMigration(id_generator)

    # check zodb id generator migration
    id_generator_reference = 'test_application_zodb'
    id_generator = self.getLastGenerator(id_generator_reference)
    id_generator.clearGenerator() # clear stored data
    self._checkDataStructureMigration(id_generator)

def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestIdToolUpgrade))
  return suite