testERP5Simulation.py 23.2 KB
Newer Older
1
# -*- coding: utf-8 -*-
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
##############################################################################
#
# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
"""
This test is experimental for new simulation implementation.
"""

import unittest
import transaction

35 36
from zLOG import LOG
from Products.CMFCore.utils import getToolByName
37 38
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList
39 40 41 42 43 44 45 46 47 48 49
from testPackingList import TestPackingList, TestPackingListMixin

class TestERP5SimulationMixin(TestPackingListMixin):
  def getBusinessTemplateList(self):
    return list(TestPackingListMixin.getBusinessTemplateList(self)) + \
           ['erp5_simulation',]

  def afterSetUp(self, quiet=1, run=1):
    TestPackingListMixin.afterSetUp(self, quiet, run)
    self.validateNewRules()

50 51 52 53 54
  def beforeTearDown(self):
    portal_rules = self.portal.portal_rules
    for rule in portal_rules.objectValues(portal_type='New Order Rule'):
      if rule.getValidationState() == 'validated':
        rule.invalidate()
55

56 57 58 59 60 61 62 63 64
class TestERP5Simulation(TestERP5SimulationMixin, ERP5TypeTestCase):
  run_all_test = 1
  quiet = 0

  def validateNewRules(self):
    # create a New Order Rule document.
    portal_rules = self.portal.portal_rules
    try:
      new_order_rule = filter(
65
        lambda x:x.title == 'New Simple Order Rule',
66 67 68
        portal_rules.objectValues(portal_type='New Order Rule'))[0]
    except IndexError:
      new_order_rule = portal_rules.newContent(
69
        title='New Simple Order Rule',
70 71 72 73 74
        portal_type='New Order Rule',
        reference='default_order_rule',
        version=2,
        )
      # create category divergence testers that is only used for matching
75
      for i in ('resource',):
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
        new_order_rule.newContent(
          title='%s divergence tester' % i,
          portal_type='Category Membership Divergence Tester',
          tested_property=i,
          divergence_provider=0,
          matching_provider=1)
      # create float divergence testers
      for i in ('converted_quantity',):
        new_order_rule.newContent(
          title='%s divergence tester' % i,
          portal_type='Float Divergence Tester',
          tested_property=i,
          use_delivery_ratio=1,
          quantity_range_min=-1,
          quantity_range_max=2)
    if new_order_rule.getValidationState() != 'validated':
      new_order_rule.validate()

  def _modifyPackingListLineQuantity(self, sequence=None,
      sequence_list=None, delta=0.0):
    """
    Set a increased quantity on packing list lines
    """
    packing_list = sequence.get('packing_list')
    quantity = self.default_quantity + delta
    sequence.edit(line_quantity=quantity)
    for packing_list_line in packing_list.objectValues(
        portal_type=self.packing_list_line_portal_type):
      packing_list_line.edit(quantity=quantity)
    sequence.edit(last_delta=delta)

  def stepIncreasePackingListLineQuantity2(self, sequence=None,
      sequence_list=None, **kw):
    return self._modifyPackingListLineQuantity(sequence, sequence_list, 2.0)

  def stepDecreasePackingListLineQuantity1(self, sequence=None,
      sequence_list=None, **kw):
    return self._modifyPackingListLineQuantity(sequence, sequence_list, -1.0)

  def stepDecreasePackingListLineQuantity10(self, sequence=None,
      sequence_list=None, **kw):
    return self._modifyPackingListLineQuantity(sequence, sequence_list, -10.0)

  def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw):
    """
      Do the split and defer action
    """
    packing_list = sequence.get('packing_list')
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
126
    sequence.edit(solver_process=solver_process)
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
    quantity_solver_decision = filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='converted_quantity',
      solver_process.contentValues())[0]
    # use Quantity Split Solver.
    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver'])
    # configure for Quantity Split Solver.
    kw = {'delivery_solver':'FIFO',
          'start_date':packing_list.getStartDate() + 10}
    quantity_solver_decision.updateConfiguration(**kw)
    solver_process.buildTargetSolverList()
    solver_process.solve()
    # build split deliveries manually. XXX ad-hoc
    previous_tag = None
    for delivery_builder in packing_list.getBuilderList():
      this_builder_tag = '%s_split_%s' % (packing_list.getPath(),
                                          delivery_builder.getId())
      after_tag = []
      if previous_tag:
        after_tag.append(previous_tag)
      delivery_builder.activate(
        after_method_id=('solve',
                         'immediateReindexObject',
                         'recursiveImmediateReindexObject',), # XXX too brutal.
        after_tag=after_tag,
        ).build(explanation_uid=packing_list.getCausalityValue().getUid())

  def stepCheckPackingListSplitted(self, sequence=None, sequence_list=None, **kw):
    """
      Test if packing list was splitted
    """
    order = sequence.get('order')
    packing_list_list = order.getCausalityRelatedValueList(
                               portal_type=self.packing_list_portal_type)
    self.assertEquals(2,len(packing_list_list))
    packing_list1 = None
    packing_list2 = None
    for packing_list in packing_list_list:
      if packing_list.getUid() == sequence.get('packing_list').getUid():
        packing_list1 = packing_list
      else:
        packing_list2 = packing_list
    sequence.edit(new_packing_list=packing_list2)
    for line in packing_list1.objectValues(
          portal_type= self.packing_list_line_portal_type):
      self.assertEquals(self.default_quantity-10,line.getQuantity())
    for line in packing_list2.objectValues(
          portal_type= self.packing_list_line_portal_type):
      self.assertEquals(10,line.getQuantity())

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
  def _checkSolverState(self, sequence=None, sequence_list=None,
                        state='solved'):
    """
      Check if target solvers' state.
    """
    solver_process = sequence.get('solver_process')
    for solver in solver_process.objectValues(
      portal_type=self.portal.getPortalTargetSolverTypeList()):
      self.assertEquals(state, solver.getSolverState())

  def stepCheckSolverIsSolving(self, sequence=None, sequence_list=None, **kw):
    """
      Check if all target solvers have 'solving' state.
    """
    self._checkSolverState(sequence, sequence_list, 'solving')

  def stepCheckSolverIsSolved(self, sequence=None, sequence_list=None, **kw):
    """
      Check if all target solvers have 'solved' state.
    """
    self._checkSolverState(sequence, sequence_list, 'solved')

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
  def test_01_splitAndDefer(self, quiet=quiet, run=run_all_test):
    """
      Change the quantity on an delivery line, then
      see if the packing list is divergent and then
      split and defer the packing list
    """
    if not run: return
    sequence_list = SequenceList()

    # Test with a simply order without cell
    sequence_string = self.default_sequence + '\
                      stepIncreasePackingListLineQuantity2 \
                      stepCheckPackingListIsCalculating \
                      stepTic \
                      stepCheckPackingListIsNotDivergent \
                      stepCheckPackingListIsSolved \
                      stepDecreasePackingListLineQuantity1 \
                      stepCheckPackingListIsCalculating \
                      stepTic \
                      stepCheckPackingListIsNotDivergent \
                      stepCheckPackingListIsSolved \
                      stepDecreasePackingListLineQuantity10 \
                      stepCheckPackingListIsCalculating \
                      stepTic \
                      stepCheckPackingListIsDiverged \
                      stepSplitAndDeferPackingList \
224
                      stepCheckSolverIsSolving \
225 226 227
                      stepTic \
                      stepCheckPackingListSplitted \
                      stepCheckPackingListIsSolved \
228
                      stepCheckSolverIsSolved \
229 230 231 232 233 234
                      '
    sequence_list.addSequenceString(sequence_string)

    sequence_list.play(self, quiet=quiet)

class TestERP5SimulationPackingList(TestERP5SimulationMixin, TestPackingList):
235
  def validateNewRules(self):
236 237
    # create a New Order Rule document.
    portal_rules = self.portal.portal_rules
238 239 240 241 242
    try:
      new_order_rule = filter(
        lambda x:x.title == 'New Default Order Rule',
        portal_rules.objectValues(portal_type='New Order Rule'))[0]
    except IndexError:
243
      new_order_rule = portal_rules.newContent(
244
        title='New Default Order Rule',
245 246 247 248 249 250 251 252 253
        portal_type='New Order Rule',
        reference='default_order_rule',
        version=2,
        )
      # create category divergence testers
      for i in ('aggregate',
                'base_application',
                'base_contribution',
                'destination',
254 255 256 257 258 259 260 261 262 263
                'destination_account', # XXX-JPS - Needed ?
                'destination_function', # XXX-JPS - Needed ?
                'destination_project', # XXX-JPS - Needed ?
                'destination_section', 
                'price_currency', # XXX-JPS - Needed ?
                'source', 
                'source_account', # XXX-JPS - Needed ?
                'source_function', # XXX-JPS - Needed ?
                'source_project', # XXX-JPS - Needed ?
                'source_section',): 
264
        new_order_rule.newContent(
265
          title='%s divergence tester' % i,
266
          portal_type='Category Membership Divergence Tester',
267
          tested_property=i)
268
      # create category divergence testers that is also used for matching
269
      for i in ('resource',):
270
        new_order_rule.newContent(
271
          title='%s divergence tester' % i,
272 273 274
          portal_type='Category Membership Divergence Tester',
          tested_property=i,
          matching_provider=1)
275
      # create variation divergence testers that is also used for matching
276
      for i in ('variation_property_dict',):
277
        # tested_property has no meaning for this tester.
278
        new_order_rule.newContent(
279
          title='%s divergence tester' % i,
280
          portal_type='Variation Divergence Tester',
281 282
          tested_property=i,
          matching_provider=1)
283 284 285 286
      # create datetime divergence testers
      for i in ('start_date',
                'stop_date',):
        new_order_rule.newContent(
287
          title='%s divergence tester' % i,
288 289 290 291 292 293
          portal_type='DateTime Divergence Tester',
          tested_property=i,
          quantity=0)
      # create float divergence testers
      for i in ('quantity',):
        new_order_rule.newContent(
294
          title='%s divergence tester' % i,
295
          portal_type='Float Divergence Tester', # XXX-JPS Quantity Divergence Tester ? (ie. quantity unit)
296 297 298
          tested_property=i,
          use_delivery_ratio=1,
          quantity=0)
299
    if new_order_rule.getValidationState() != 'validated':
300 301
      new_order_rule.validate()

302 303 304 305 306
  def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None, **kw):
    """
    Solve quantity divergence by using solver tool.
    """
    packing_list = sequence.get('packing_list')
307 308 309 310 311
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
    quantity_solver_decision = filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
      solver_process.contentValues())[0]
312 313 314
    # use Quantity Accept Solver.
    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Accept Solver'])
    solver_process.buildTargetSolverList()
315
    solver_process.solve()
316 317 318
    # XXX-JPS We do not need the divergence message anymore.
    # since the divergence message == the divergence tester itself
    # with its title, description, tested property, etc.
319 320 321 322 323 324

  def stepAcceptDecisionResource(self,sequence=None, sequence_list=None, **kw):
    """
    Solve quantity divergence by using solver tool.
    """
    packing_list = sequence.get('packing_list')
325 326 327 328 329
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
    resource_solver_decision = filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='resource',
      solver_process.contentValues())[0]
330 331
    # use Resource Replacement Solver.
    resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Replacement Solver'])
332
    solver_process.buildTargetSolverList()
333 334
    solver_process.solve()

335 336 337 338 339
  def stepSplitAndDeferPackingList(self, sequence=None, sequence_list=None, **kw):
    """
      Do the split and defer action
    """
    packing_list = sequence.get('packing_list')
340 341 342 343 344
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
    quantity_solver_decision = filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
      solver_process.contentValues())[0]
345 346 347
    # use Quantity Split Solver.
    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Split Solver'])
    # configure for Quantity Split Solver.
348 349 350
    kw = {'delivery_solver':'FIFO',
          'start_date':self.datetime + 15,
          'stop_date':self.datetime + 25}
351 352
    quantity_solver_decision.updateConfiguration(**kw)
    solver_process.buildTargetSolverList()
353
    solver_process.solve()
354 355 356 357 358 359 360 361 362 363 364 365 366 367
    # build split deliveries manually. XXX ad-hoc
    previous_tag = None
    for delivery_builder in packing_list.getBuilderList():
      this_builder_tag = '%s_split_%s' % (packing_list.getPath(),
                                          delivery_builder.getId())
      after_tag = []
      if previous_tag:
        after_tag.append(previous_tag)
      delivery_builder.activate(
        after_method_id=('solve',
                         'immediateReindexObject',
                         'recursiveImmediateReindexObject',), # XXX too brutal.
        after_tag=after_tag,
        ).build(explanation_uid=packing_list.getCausalityValue().getUid())
368

369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
  def _adoptPrevisionQuantity(self, packing_list):
    """
    Solve quantity divergence by using solver tool.
    """
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
    quantity_solver_decision = filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='quantity',
      solver_process.contentValues())[0]
    # use Quantity Adoption Solver.
    quantity_solver_decision.setSolverValue(self.portal.portal_types['Quantity Adoption Solver'])
    solver_process.buildTargetSolverList()
    solver_process.solve()

  def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None, **kw):
    """
    Solve quantity divergence by using solver tool.
    """
    packing_list = sequence.get('packing_list')
    self._adoptPrevisionQuantity(packing_list)

  def stepNewPackingListAdoptPrevisionQuantity(self, sequence=None,
                                               sequence_list=None, **kw):
    """
    Solve quantity divergence by using solver tool.
    """
    packing_list = sequence.get('new_packing_list')
    self._adoptPrevisionQuantity(packing_list)

  def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None, **kw):
    """
    Solve resource divergence by using solver tool.
    """
    packing_list = sequence.get('packing_list')
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
    resource_solver_decision = filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='resource',
      solver_process.contentValues())[0]
    # use Resource Adopt Solver.
    resource_solver_decision.setSolverValue(self.portal.portal_types['Resource Adoption Solver'])
    solver_process.buildTargetSolverList()
    solver_process.solve()

413 414 415 416 417 418 419 420 421 422 423 424 425 426
  def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None, **kw):
    """
      Look if the packing list has new previsions
    """
    old_packing_list_line = sequence.get('packing_list_line')
    packing_list_line = old_packing_list_line.aq_parent[str(int(old_packing_list_line.getId())-1)]
    resource = sequence.get('resource')
    for line in sequence.get('packing_list').getMovementList():
      self.assertEquals(line.getResourceValue(), resource)
      self.assertEquals(line.getQuantity(), self.default_quantity)
      self.assertEquals(line.getCausalityList(),
                        [x.getOrder() for x in \
                         line.getDeliveryRelatedValueList()])

427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
  def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None, **kw):
    """
      Check if simulation movement are disconnected
    """
    packing_list = sequence.get('packing_list')
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
    for destination_solver_decision in filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='destination',
      solver_process.contentValues()):
      # use Destination Replacement Solver.
      destination_solver_decision.setSolverValue(self.portal.portal_types['Destination Replacement Solver'])
    solver_process.buildTargetSolverList()
    solver_process.solve()

442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
  def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None, **kw):
    """
      Check if simulation movement are disconnected
    """
    packing_list = sequence.get('packing_list')
    solver_tool = self.portal.portal_solvers
    solver_process = solver_tool.newSolverProcess(packing_list)
    for start_date_solver_decision in filter(
      lambda x:x.getCausalityValue().getTestedProperty()=='start_date',
      solver_process.contentValues()):
      # use StartDate Replacement Solver.
      start_date_solver_decision.setSolverValue(self.portal.portal_types['Start Date Replacement Solver'])
    solver_process.buildTargetSolverList()
    solver_process.solve()

457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
  def checkOrderRuleSimulation(self, rule_reference, sequence=None, sequence_list=None, **kw):
    """
      Test if simulation is matching order, be sure that rule_reference is used
      to expand simulation for order
    """
    order = sequence.get('order')
    related_applied_rule_list = order.getCausalityRelatedValueList( \
                                   portal_type=self.applied_rule_portal_type)
    no_applied_rule_state = ('draft', 'auto_planned')
    order_state = order.getSimulationState()

    if order_state in no_applied_rule_state:
      self.assertEquals(0, len(related_applied_rule_list))
    else:
      LOG('stepCheckOrderRuleSimulation', 0, 'related_applied_rule_list: %s' %
                   str([x.getObject() for x in related_applied_rule_list]))
      self.assertEquals(1, len(related_applied_rule_list))
      applied_rule = related_applied_rule_list[0].getObject()
      sequence.edit(applied_rule=applied_rule)
      self.failUnless(applied_rule is not None)
      self.failUnless(order_state, \
                      applied_rule.getLastExpandSimulationState())

      # Test if applied rule has a specialise value with passed rule_reference
      portal_rules = getToolByName(order, 'portal_rules')
      self.assertEquals(rule_reference,
                        applied_rule.getSpecialiseReference())

      simulation_movement_list = applied_rule.objectValues()
      sequence.edit(simulation_movement_list=simulation_movement_list)

      # Count the number of movement in order
      movement_list = order.getMovementList()
      # Check if number of unique movement is equal to number of
      # simulation movement
      unique_movement_list = dict(
        [('%r,%r,%r' % (x.getResource(), x.getVariationCategoryList(),
          x.getVariationPropertyDict())), x] for x in movement_list).values()
      self.assertEquals(len(unique_movement_list),
                        len(simulation_movement_list))
      # Check if all movements are related to simulation movements
      order_movement_list = sum([x.getOrderValueList() for x in \
                                 simulation_movement_list], [])
      self.failIfDifferentSet(movement_list, order_movement_list)

      # Check each simulation movement
      for simulation_movement in simulation_movement_list:
        order_movement_list = simulation_movement.getOrderValueList()
        # Test quantity
        self.assertEquals(sum([x.getQuantity() for x in order_movement_list]),
                          simulation_movement.getQuantity())
        for order_movement in order_movement_list:
          # Test price
          self.assertEquals(order_movement.getPrice(), \
                            simulation_movement.getPrice())
          # Test resource
          self.assertEquals(order_movement.getResource(), \
                            simulation_movement.getResource())
          # Test resource variation
          self.assertEquals(order_movement.getVariationText(), \
                            simulation_movement.getVariationText())
          self.assertEquals(order_movement.getVariationCategoryList(), \
                            simulation_movement.getVariationCategoryList())
          # XXX Test acquisition
          self.checkAcquisition(simulation_movement, order_movement)

  def stepModifySimulationLineQuantityForMergedLine(self,sequence=None, sequence_list=None, **kw):
    """
      Check if simulation movement are disconnected
    """
    applied_rule = sequence.get('applied_rule')
    simulation_line_list = applied_rule.objectValues()
    self.assertEquals(len(simulation_line_list), 1)
    for simulation_line in simulation_line_list:
      simulation_line.edit(quantity=self.default_quantity-2)
      simulation_line.getOrderValue().edit(quantity=self.default_quantity-2)

534 535 536 537 538 539 540 541 542 543 544 545
  def stepCheckSimulationQuantityUpdatedForMergedLine(self,sequence=None, sequence_list=None, **kw):
    """
      Test if the quantity of the simulation movement was changed
    """
    applied_rule = sequence.get('applied_rule')
    simulation_line_list = applied_rule.objectValues()
    self.assertEquals(len(simulation_line_list), 1)
    for simulation_line in simulation_line_list:
      self.assertEquals(simulation_line.getQuantity() + \
                        simulation_line.getDeliveryError(),
                        self.default_quantity * 2)

546 547 548
def test_suite():
  suite = unittest.TestSuite()
  suite.addTest(unittest.makeSuite(TestERP5Simulation))
549
  suite.addTest(unittest.makeSuite(TestERP5SimulationPackingList))
550
  return suite