Commit bdb7ee66 authored by Jérome Perrin's avatar Jérome Perrin

SimulationTool: implement "linear" flow API

Conflicts:
	product/ERP5/bootstrap/erp5_core/SkinTemplateItem/portal_skins/erp5_core/Resource_zGetInventoryList.xml
parent 5016f968
This source diff could not be displayed because it is too large. You can view the blob instead.
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testInventoryAPI</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testInventoryAPI</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
1
\ No newline at end of file
test.erp5.testInventoryAPI
\ No newline at end of file
tmp_work
\ No newline at end of file
......@@ -563,6 +563,8 @@ class SimulationTool(BaseTool):
omit_output=0,
omit_asset_increase=0,
omit_asset_decrease=0,
# interpolation_method
interpolation_method='default',
# group by
group_by_node=0,
group_by_node_category=0,
......@@ -599,6 +601,7 @@ class SimulationTool(BaseTool):
group_by_function_category=0,
group_by_function_category_strict_membership=0,
group_by_date=0,
group_by_time_sequence_list=None,
# sort_on
sort_on=None,
group_by=None,
......@@ -689,6 +692,43 @@ class SimulationTool(BaseTool):
date_dict['range'] = 'ngt'
if date_dict:
column_value_dict['date'] = date_dict
if interpolation_method != 'default':
if not group_by_time_sequence_list:
if not (from_date and (to_date or at_date)):
raise ValueError("date_range is required to use interpolation_method")
# if we consider flow, we also select movement whose mirror date is
# in the from_date/to_date range and movement whose
# start_date/stop_date contains the report range.
# The selected range is wider, but the selected movements will have an
# "interpolation_ratio" applied to their quantity and prices.
if to_date:
column_value_dict['date'] = ComplexQuery(
Query(date=(from_date, to_date), range='minmax'),
Query(mirror_date=(from_date, to_date), range='minmax'),
ComplexQuery(
Query(mirror_date=from_date, range='min'),
Query(date=to_date, range='max'),
operator="AND"),
ComplexQuery(
Query(date=from_date, range='min'),
Query(mirror_date=to_date, range='max'),
operator="AND"),
operator="OR"
)
else:
column_value_dict['date'] = ComplexQuery(
Query(date=(from_date, at_date), range='minngt'),
Query(mirror_date=(from_date, at_date), range='minngt'),
ComplexQuery(
Query(mirror_date=from_date, range='min'),
Query(date=at_date, range='ngt'),
operator="AND"),
ComplexQuery(
Query(date=from_date, range='min'),
Query(mirror_date=at_date, range='ngt'),
operator="AND"),
operator="OR"
)
else:
column_value_dict['date'] = {'query': [to_date], 'range': 'ngt'}
column_value_dict['mirror_date'] = {'query': [from_date], 'range': 'nlt'}
......@@ -980,6 +1020,8 @@ class SimulationTool(BaseTool):
new_kw['related_key_select_expression_list'] =\
related_key_select_expression_list
# XXX
sql_kw['group_by_time_sequence_list'] = group_by_time_sequence_list
return sql_kw, new_kw
#######################################################
......@@ -1063,6 +1105,17 @@ class SimulationTool(BaseTool):
output_simulation_state - only take rows with specified simulation_state
and quantity < 0
interpolation_method - Method to consider movements when calculating flows.
* (default): Consider the movement decreases 100% of source stock on
start date and increase 100% of the destination node stock on stop
date.
* linear: consider the movement decreases source stock and increase
destination stock linearly between start date and stop date.
* all_or_nothing: consider only movement who starts after the beginning of
the query period and finishes after the end of the query period.
* one_for_all: consider the movement fully as long as it is partially
contained in the query period.
ignore_variation - do not take into account variation in inventory
calculation (useless on getInventory, but useful on
getInventoryList)
......@@ -1189,6 +1242,8 @@ class SimulationTool(BaseTool):
group_by_section_category=0,
group_by_section_category_strict_membership=0,
group_by_resource=None,
group_by_time_sequence_list=(),
movement_list_mode=0,
group_by=None,
**ignored):
"""
......@@ -1208,7 +1263,8 @@ class SimulationTool(BaseTool):
group_by_function or group_by_mirror_section or group_by_payment or \
group_by_sub_variation or group_by_variation or \
group_by_movement or group_by_date or group_by_section_category or\
group_by_section_category_strict_membership:
group_by_section_category_strict_membership or \
group_by_time_sequence_list:
if group_by_resource is None:
group_by_resource = 1
new_group_by_dict['group_by_resource'] = group_by_resource
......@@ -1225,6 +1281,8 @@ class SimulationTool(BaseTool):
omit_simulation=0,
only_accountable=True,
default_stock_table='stock',
interpolation_method='default',
selection_domain=None, selection_report=None,
statistic=0, inventory_list=1,
precision=None, connection_id=None,
**kw):
......@@ -1302,6 +1360,7 @@ class SimulationTool(BaseTool):
'stock_table_id': default_stock_table,
'src__': src__,
'ignore_variation': ignore_variation,
'interpolation_method': interpolation_method,
'standardise': standardise,
'omit_simulation': omit_simulation,
'only_accountable': only_accountable,
......@@ -1315,7 +1374,9 @@ class SimulationTool(BaseTool):
# Get cached data
if getattr(self, "Resource_zGetInventoryCacheResult", None) is not None and \
optimisation__ and (not kw.get('from_date')) and \
'transformed_resource' not in kw:
'transformed_resource' not in kw \
and "category" not in str(kw) \
and "group_by_time_sequence_list" not in kw:
# Here is the different kind of date
# from_date : >=
# to_date : <
......@@ -1345,7 +1406,7 @@ class SimulationTool(BaseTool):
kw['from_date'] = cached_date
else:
cached_result = []
sql_kw, new_kw = self._generateKeywordDict(**kw)
sql_kw, new_kw = self._generateKeywordDict(interpolation_method=interpolation_method, **kw)
# Copy kw content as _generateSQLKeywordDictFromKeywordDict
# remove some values from it
try:
......@@ -1358,6 +1419,25 @@ class SimulationTool(BaseTool):
stock_sql_kw = self._generateSQLKeywordDictFromKeywordDict(
table=default_stock_table, sql_kw=sql_kw, new_kw=new_kw_copy)
stock_sql_kw.update(base_inventory_kw)
# TODO: move in _generateSQLKeywordDictFromKeywordDict
if interpolation_method in ('linear', 'all_or_nothing', 'one_for_all'):
# XXX only DateTime instance are supported
from_date = kw.get('from_date')
if from_date:
from_date = from_date.toZone("UTC")
to_date = kw.get('to_date')
if to_date:
to_date = to_date.toZone("UTC")
at_date = kw.get('at_date')
if at_date:
at_date = at_date.toZone("UTC")
stock_sql_kw['interpolation_method_from_date'] = from_date
stock_sql_kw['interpolation_method_to_date'] = to_date
stock_sql_kw['interpolation_method_at_date'] = at_date
elif interpolation_method != 'default':
raise ValueError("Unsupported interpolation_method %r" % (interpolation_method,))
delta_result = self.Resource_zGetInventoryList(
**stock_sql_kw)
if src__:
......
<dtml-let interpolation_ratio="SimulationTool_zGetInterpolationMethod(
stock_table_id=stock_table_id,
interpolation_method=interpolation_method,
interpolation_method_from_date=interpolation_method_from_date,
interpolation_method_to_date=interpolation_method_to_date,
interpolation_method_at_date=interpolation_method_at_date,
group_by_time_sequence_list=group_by_time_sequence_list,
src__=1)">
SELECT
<dtml-if expr="precision is not None">
SUM(ROUND(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>, <dtml-var precision>)) AS inventory,
SUM(ROUND(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>, <dtml-var precision>)) AS total_quantity,
SUM(ROUND(
<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>, <dtml-var precision>)) AS inventory,
SUM(ROUND(
<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>, <dtml-var precision>)) AS total_quantity,
<dtml-if convert_quantity_result>
SUM(ROUND(<dtml-var stock_table_id>.quantity * measure.quantity
<dtml-if quantity_unit_uid> / quantity_unit_conversion.quantity</dtml-if>
<dtml-if transformed_uid> * transformation.quantity</dtml-if>, <dtml-var precision>))
* <dtml-var interpolation_ratio>, <dtml-var precision>))
AS converted_quantity,
</dtml-if>
IFNULL(SUM(ROUND(<dtml-var stock_table_id>.total_price, <dtml-var precision>)), 0) AS total_price
IFNULL(SUM(ROUND(
<dtml-var stock_table_id>.total_price * <dtml-var interpolation_ratio>, <dtml-var precision>)), 0) AS total_price
<dtml-else>
SUM(<dtml-var stock_table_id>.quantity <dtml-if transformed_uid> * transformation.quantity</dtml-if>) AS inventory,
SUM(<dtml-var stock_table_id>.quantity <dtml-if transformed_uid> * transformation.quantity</dtml-if>) AS total_quantity,
SUM(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>
) AS inventory,
SUM(<dtml-var stock_table_id>.quantity
<dtml-if transformed_uid> * transformation.quantity</dtml-if>
* <dtml-var interpolation_ratio>
) AS total_quantity,
<dtml-if convert_quantity_result>
ROUND(SUM(<dtml-var stock_table_id>.quantity * measure.quantity
<dtml-if quantity_unit_uid> / quantity_unit_conversion.quantity</dtml-if>
<dtml-if transformed_uid> * transformation.quantity</dtml-if>), 12)
<dtml-if transformed_uid> * transformation.quantity</dtml-if> * <dtml-var interpolation_ratio>), 12)
AS converted_quantity,
</dtml-if>
IFNULL(SUM(<dtml-var stock_table_id>.total_price), 0) AS total_price
IFNULL(SUM(<dtml-var stock_table_id>.total_price * <dtml-var interpolation_ratio>), 0) AS total_price
</dtml-if>
<dtml-if inventory_list>
,
......@@ -56,6 +77,8 @@ SELECT
COUNT(DISTINCT <dtml-var stock_table_id>.uid) AS stock_uid,
MAX(<dtml-var stock_table_id>.date) AS date
</dtml-if>
<dtml-if group_by_time_sequence_list>, slot_index </dtml-if> <dtml-comment>XXX is this really needed? are empty slots returned ? </dtml-comment>
<dtml-if select_expression>, <dtml-var select_expression></dtml-if>
FROM
......@@ -69,6 +92,55 @@ FROM
</dtml-if>
</dtml-in>
, <dtml-var stock_table_id>
<dtml-if group_by_time_sequence_list>
RIGHT JOIN
( <dtml-in prefix="time_slot" expr="_.list(_.enumerate(group_by_time_sequence_list))">
SELECT
<dtml-sqlvar expr="time_slot_key" type="int"> slot_index,
<dtml-sqlvar expr="time_slot_item.get('from_date')" type="datetime" optional> slot_from_date,
<dtml-sqlvar expr="time_slot_item.get('at_date')" type="datetime" optional> slot_at_date,
<dtml-sqlvar expr="time_slot_item.get('to_date')" type="datetime" optional> slot_to_date
<dtml-unless time_slot_end>UNION ALL</dtml-unless>
</dtml-in> ) slots
ON
<dtml-if group_by_time_sequence_list>
(
( slot_from_date is not null AND
( slot_at_date is not null AND
GREATEST(`stock`.`date`, `stock`.`mirror_date`) >= slot_from_date AND
LEAST(`stock`.`date`, `stock`.`mirror_date`) <= slot_at_date
) OR (
(
slot_to_date is not null AND
GREATEST(`stock`.`date`, `stock`.`mirror_date`) >= slot_from_date AND
LEAST(`stock`.`date`, `stock`.`mirror_date`) < slot_to_date
) OR (
GREATEST(`stock`.`date`, `stock`.`mirror_date`) >= slot_from_date AND
slot_at_date is null AND slot_to_date is null
)
)
) OR (
slot_from_date is null AND (
( slot_at_date is not null AND
( LEAST(`stock`.`date`, `stock`.`mirror_date`) <= slot_at_date )
) OR LEAST(`stock`.`date`, `stock`.`mirror_date`) < slot_to_date
)
)
)
<dtml-else>
(
( slot_from_date is null OR stock.date >= slot_from_date )
AND ( slot_at_date is null OR stock.date <= slot_at_date )
AND ( slot_to_date is null OR stock.date < slot_to_date )
)
</dtml-if>
</dtml-if>
</dtml-if>
<dtml-if quantity_unit_uid> <dtml-comment>XXX quantity unit conversion will not work when using implict_join=False</dtml-comment>
LEFT JOIN quantity_unit_conversion ON
......@@ -116,9 +188,16 @@ WHERE
<dtml-if group_by_expression>
GROUP BY
<dtml-if transformed_uid>transformation.transformed_uid,</dtml-if>
<dtml-if group_by_time_sequence_list>slot_index,</dtml-if>
<dtml-var group_by_expression>
</dtml-if>
<dtml-if order_by_expression>
ORDER BY
<dtml-var order_by_expression>
<dtml-else>
<dtml-if group_by_time_sequence_list>
ORDER BY slot_index
</dtml-if>
</dtml-if>
</dtml-let>
\ No newline at end of file
......@@ -46,7 +46,12 @@ convert_quantity_result\n
quantity_unit_uid\n
stock_table_id=stock\n
transformed_uid\n
transformed_variation_text</string> </value>
transformed_variation_text\n
group_by_time_sequence_list:list\n
interpolation_method\n
interpolation_method_from_date=not_applicable\n
interpolation_method_to_date=not_applicable\n
interpolation_method_at_date=not_applicable</string> </value>
</item>
<item>
<key> <string>cache_time_</string> </key>
......
<dtml-if expr="interpolation_method == 'linear'">
<dtml-if group_by_time_sequence_list>
CASE
WHEN <dtml-var stock_table_id>.mirror_date = <dtml-var stock_table_id>.date THEN 1
ELSE (
UNIX_TIMESTAMP(
IFNULL(
LEAST(
IFNULL(slot_at_date, slot_to_date),
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
),
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
)
)
- UNIX_TIMESTAMP(
GREATEST(
IFNULL(
slot_from_date, TIMESTAMP(0)
),
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
)
)
)
/ (
UNIX_TIMESTAMP(GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) -
UNIX_TIMESTAMP(LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) ) END
<dtml-else>
CASE
WHEN <dtml-var stock_table_id>.mirror_date = <dtml-var stock_table_id>.date THEN 1
ELSE (
UNIX_TIMESTAMP(LEAST(
<dtml-if interpolation_method_at_date>
<dtml-sqlvar interpolation_method_at_date type="datetime">
<dtml-else>
<dtml-sqlvar interpolation_method_to_date type="datetime">
</dtml-if>,
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date) ))
- UNIX_TIMESTAMP(GREATEST(<dtml-sqlvar interpolation_method_from_date type="datetime">,
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date))))
/ ( UNIX_TIMESTAMP(GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) -
UNIX_TIMESTAMP(LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)) ) END
</dtml-if>
<dtml-elif expr="interpolation_method == 'all_or_nothing'">
CASE
WHEN (
-- movement contained in time frame
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
>= <dtml-sqlvar interpolation_method_from_date type="datetime"> AND
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
<dtml-if interpolation_method_at_date>
<= <dtml-sqlvar interpolation_method_at_date type="datetime">
<dtml-else>
< <dtml-sqlvar interpolation_method_to_date type="datetime">
</dtml-if>
) THEN 1
ELSE 0
END
<dtml-elif expr="interpolation_method == 'one_for_all'">
CASE
WHEN (
-- movement overlaps with time frame
GREATEST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
<= <dtml-sqlvar interpolation_method_from_date type="datetime"> OR
LEAST(<dtml-var stock_table_id>.date, <dtml-var stock_table_id>.mirror_date)
<dtml-if interpolation_method_at_date>
>= <dtml-sqlvar interpolation_method_at_date type="datetime">
<dtml-else>
> <dtml-sqlvar interpolation_method_to_date type="datetime">
</dtml-if>
) THEN 0
ELSE 1
END
<dtml-else>
1
</dtml-if>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="SQL" module="Products.ZSQLMethods.SQL"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_Use_Database_Methods_Permission</string> </key>
<value>
<list>
<string>Member</string>
</list>
</value>
</item>
<item>
<key> <string>arguments_src</string> </key>
<value> <string>stock_table_id\r\n
interpolation_method\r\n
interpolation_method_from_date\r\n
interpolation_method_to_date\r\n
interpolation_method_at_date\r\n
group_by_time_sequence_list:list</string> </value>
</item>
<item>
<key> <string>connection_id</string> </key>
<value> <string>cmf_activity_sql_connection</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>SimulationTool_zGetInterpolationMethod</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
##############################################################################
#
# Copyright (c) 2004 Nexedi SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""Unit Tests for Tracking API.
"""
import os
import unittest
from DateTime import DateTime
from Products.ERP5.tests.testInventoryAPI import InventoryAPITestCase
class TestTrackingList(InventoryAPITestCase):
"""Tests Item Tracking
"""
def testNodeUid(self):
getTrackingList = self.getSimulationTool().getTrackingList
start_date = DateTime()
def makeMovement(aggregate=None):
self._makeMovement(quantity=1, price=1,
aggregate_value=aggregate,
resource_value=self.resource,
start_date = start_date,
source_value=self.other_node,
destination_value=self.node)
item_uid = self.item.getUid()
other_item_uid = self.other_item.getUid()
node_uid = self.node.getUid()
self.assertEqual(len(getTrackingList(node_uid=node_uid,
at_date=start_date)),0)
makeMovement(aggregate=self.item)
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),1)
self.failIfDifferentSet([x.uid for x in result], [item_uid])
makeMovement(aggregate=self.other_item)
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),2)
self.failIfDifferentSet([x.uid for x in result], [item_uid, other_item_uid])
def testSeveralAggregateOnMovement(self):
getTrackingList = self.getSimulationTool().getTrackingList
start_date = DateTime()
def makeMovement(aggregate_list=None):
self._makeMovement(quantity=1, price=1,
aggregate_list=aggregate_list,
resource_value=self.resource,
start_date = start_date,
source_value=self.other_node,
destination_value=self.node)
item_uid = self.item.getUid()
other_item_uid = self.other_item.getUid()
node_uid = self.node.getUid()
self.assertEqual(len(getTrackingList(node_uid=node_uid,
at_date=start_date)),0)
makeMovement(aggregate_list=[self.item.getRelativeUrl(),
self.other_item.getRelativeUrl()])
result = getTrackingList(node_uid=node_uid,at_date=start_date)
self.assertEqual(len(result),2)
self.failIfDifferentSet([x.uid for x in result], [item_uid, other_item_uid])
def testDates(self):
"""
Test different dates parameters of getTrackingList.
"""
getTrackingList = self.getSimulationTool().getTrackingList
now = DateTime()
node_1 = self._makeOrganisation(title='Node 1')
node_2 = self._makeOrganisation(title='Node 2')
date_0 = now - 4 # Before first movement
date_1 = now - 3 # First movement
date_2 = now - 2 # Between both movements
date_3 = now - 1 # Second movement
date_4 = now # After last movement
self._makeMovement(quantity=1, price=1,
aggregate_value=self.item,
resource_value=self.resource,
start_date=date_1,
source_value=None,
destination_value=node_1)
self._makeMovement(quantity=1, price=1,
aggregate_value=self.item,
resource_value=self.resource,
start_date=date_3,
source_value=node_1,
destination_value=node_2)
node_1_uid = node_1.getUid()
node_2_uid = node_2.getUid()
date_location_dict = {
date_0: {'at_date': None, 'to_date': None},
date_1: {'at_date': node_1_uid, 'to_date': None},
date_2: {'at_date': node_1_uid, 'to_date': node_1_uid},
date_3: {'at_date': node_2_uid, 'to_date': node_1_uid},
date_4: {'at_date': node_2_uid, 'to_date': node_2_uid}
}
node_uid_to_node_number = {
node_1_uid: 1,
node_2_uid: 2
}
for date, location_dict in date_location_dict.iteritems():
for param_id, location_uid in location_dict.iteritems():
param_dict = {param_id: date}
uid_list = [x.node_uid for x in getTrackingList(
aggregate_uid=self.item.getUid(), **param_dict)]
if location_uid is None:
self.assertEqual(len(uid_list), 0)
else:
self.assertEqual(len(uid_list), 1)
self.assertEqual(uid_list[0], location_uid,
'%s=now - %i, aggregate should be at node %i but is at node %i' % \
(param_id, now - date, node_uid_to_node_number[location_uid], node_uid_to_node_number[uid_list[0]]))
def testFutureTrackingList(self):
movement = self._makeMovement(quantity=1, aggregate_value=self.item,)
getFutureTrackingList = self.portal.portal_simulation.getFutureTrackingList
node_uid = self.node.getUid()
for state in ('planned', 'ordered', 'confirmed', 'ready', 'started',
'stopped', 'delivered'):
movement.simulation_state = state
movement.reindexObject()
self.tic()
tracking_node_uid_list = [brain.node_uid for brain in
getFutureTrackingList(item=self.item.getRelativeUrl())]
self.assertEqual([node_uid], tracking_node_uid_list,
"%s != %s (state:%s)" % ([node_uid], tracking_node_uid_list, state))
for state in ('draft', 'cancelled', 'deleted'):
movement.simulation_state = state
movement.reindexObject()
self.tic()
tracking_node_uid_list = [brain.node_uid for brain in
getFutureTrackingList(item=self.item.getRelativeUrl())]
self.assertEqual([], tracking_node_uid_list,
"%s != %s (state:%s)" % ([], tracking_node_uid_list, state))
def _createScenarioToTestTrackingListMethod(self,
state_a="delivered", state_b="delivered", state_c="delivered"):
"""
Scenario:
Item 1 => A -> B -> C
Item 2 => A -> B
"""
now = DateTime()
item_a = self.getItemModule().newContent(title="Item 1")
item_b = self.getItemModule().newContent(title="Item 2")
node_a = self._makeOrganisation(title='Node A')
node_b = self._makeOrganisation(title='Node B')
node_c = self._makeOrganisation(title='Node C')
movement_a = self._makeMovement(source_value=node_a,
destination_value=node_b, resource=self.resource,
quantity=1, aggregate_value=item_a, start_date=now,
simulation_state=state_a)
movement_b = self._makeMovement(source_value=node_b,
destination_value=node_c, resource=self.resource,
quantity=1, aggregate_value=item_a, start_date=now+1,
simulation_state=state_b)
movement_c = self._makeMovement(source_value=node_a,
destination_value=node_b, resource=self.resource,
quantity=1, aggregate_value=item_b, start_date=now+1,
simulation_state=state_c)
self.tic()
return {"item_a": item_a, "item_b": item_b,
"node_a": node_a, "node_b": node_b,
"movement_a": movement_a, "movement_b": movement_b,
"movement_c": movement_c}
def testTrackingListWithOutputParameter(self):
"""
Add test to check if getTrackingList with output=1, returns only Item 2
if you search for items in B
"""
data_dict = self._createScenarioToTestTrackingListMethod()
item_a = data_dict['item_a']
item_b = data_dict['item_b']
node_b = data_dict['node_b']
getTrackingList = self.portal.portal_simulation.getTrackingList
path_list = [i.path for i in getTrackingList(
node_uid=node_b.getUid())]
self.assertTrue(item_a.getPath() in path_list,
"Is expected %s in B" % item_a.getPath())
self.assertTrue(item_b.getPath() in path_list,
"Is expected %s in B" % item_b.getPath())
path_list = [i.path for i in getTrackingList(
node_uid=node_b.getUid(), output=1)]
self.assertTrue(item_a.getPath() not in path_list,
"%s should not be in B" % item_a.getTitle())
self.assertTrue(item_b.getPath() in path_list,
"%s should be in B" % item_b.getTitle())
def testCurrentTrackingListWithOutputParameter(self):
"""
Add test to check if getCurrentTrackingList with B -> C started and not delivered,
returns only Item 2 if you search for items in B
"""
data_dict = self._createScenarioToTestTrackingListMethod(state_b="started")
item_a = data_dict['item_a']
item_b = data_dict['item_b']
node_b = data_dict['node_b']
getCurrentTrackingList = self.portal.portal_simulation.getCurrentTrackingList
path_list = [i.path for i in getCurrentTrackingList(
node_uid=node_b.getUid(), output=1)]
self.assertTrue(item_a.getPath() not in path_list,
"%s should not be in B" % item_a.getTitle())
self.assertTrue(item_b.getPath() in path_list,
"%s should be in B" % item_a.getTitle())
# TODO: missing tests for input=1
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestTrackingList))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment