Commit d1669f69 authored by Eteri's avatar Eteri Committed by Klaus Wölfel

New Functionality : Support measurement intervals

/reviewed-on !10
parent 01e6c52e
...@@ -2,6 +2,7 @@ portal = context.getPortalObject() ...@@ -2,6 +2,7 @@ portal = context.getPortalObject()
operation = None operation = None
use = None use = None
parameter_dict = {} parameter_dict = {}
context.checkConsistency(fixit=True) context.checkConsistency(fixit=True)
initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue() initial_product = context.getSpecialiseValue(portal_type="Data Transformation").getResourceValue()
for analysis_line in context.objectValues(portal_type="Data Analysis Line"): for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
...@@ -28,6 +29,9 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"): ...@@ -28,6 +29,9 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
parameter[base_category] = analysis_line.getVariationCategoryItemList( parameter[base_category] = analysis_line.getVariationCategoryItemList(
base_category_list=(base_category,))[0][0] base_category_list=(base_category,))[0][0]
reference = analysis_line.getReference() reference = analysis_line.getReference()
parameter["Start Date"] = analysis_line.getStartDate()
parameter["Stop Date"] = analysis_line.getStopDate()
# several lines with same reference wil turn the parameter into a list # several lines with same reference wil turn the parameter into a list
if reference in parameter_dict: if reference in parameter_dict:
if not isinstance(parameter_dict[reference], list): if not isinstance(parameter_dict[reference], list):
...@@ -35,7 +39,9 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"): ...@@ -35,7 +39,9 @@ for analysis_line in context.objectValues(portal_type="Data Analysis Line"):
parameter_dict[reference].append(parameter) parameter_dict[reference].append(parameter)
else: else:
parameter_dict[reference] = parameter parameter_dict[reference] = parameter
script_id = operation.getScriptId() script_id = operation.getScriptId()
out = getattr(operation_analysis_line, script_id)(**parameter_dict) out = getattr(operation_analysis_line, script_id)(**parameter_dict)
if out == 1: if out == 1:
......
...@@ -4,7 +4,6 @@ from Products.ERP5Type.Errors import UnsupportedWorkflowMethod ...@@ -4,7 +4,6 @@ from Products.ERP5Type.Errors import UnsupportedWorkflowMethod
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
now = DateTime() now = DateTime()
if not include_delivered: if not include_delivered:
...@@ -35,6 +34,7 @@ for movement in portal_catalog(query): ...@@ -35,6 +34,7 @@ for movement in portal_catalog(query):
delivery = movement.getParentValue() delivery = movement.getParentValue()
data_supply = delivery.getSpecialiseValue(portal_type="Data Supply") data_supply = delivery.getSpecialiseValue(portal_type="Data Supply")
data_supply_list = delivery.getSpecialiseValueList(portal_type="Data Supply") data_supply_list = delivery.getSpecialiseValueList(portal_type="Data Supply")
composed_data_supply = data_supply.asComposedDocument() composed_data_supply = data_supply.asComposedDocument()
# Get applicable transformation # Get applicable transformation
transformation_list = [] transformation_list = []
...@@ -48,6 +48,7 @@ for movement in portal_catalog(query): ...@@ -48,6 +48,7 @@ for movement in portal_catalog(query):
validation_state = "validated", validation_state = "validated",
resource_relative_url = movement.getResource())) resource_relative_url = movement.getResource()))
for transformation in transformation_list: for transformation in transformation_list:
is_shared_data_analysis = False is_shared_data_analysis = False
# Check if analysis already exists # Check if analysis already exists
data_analysis = portal_catalog.getResultValue( data_analysis = portal_catalog.getResultValue(
...@@ -84,6 +85,7 @@ for movement in portal_catalog(query): ...@@ -84,6 +85,7 @@ for movement in portal_catalog(query):
destination = delivery.getDestination(), destination = delivery.getDestination(),
destination_section = delivery.getDestinationSection(), destination_section = delivery.getDestinationSection(),
destination_project = delivery.getDestinationProject()) destination_project = delivery.getDestinationProject())
data_analysis.checkConsistency(fixit=True) data_analysis.checkConsistency(fixit=True)
# create input and output lines # create input and output lines
for transformation_line in transformation.objectValues( for transformation_line in transformation.objectValues(
...@@ -91,47 +93,61 @@ for movement in portal_catalog(query): ...@@ -91,47 +93,61 @@ for movement in portal_catalog(query):
"Data Transformation Operation Line"]): "Data Transformation Operation Line"]):
resource = transformation_line.getResourceValue() resource = transformation_line.getResourceValue()
quantity = transformation_line.getQuantity() quantity = transformation_line.getQuantity()
if isinstance(quantity, tuple): if isinstance(quantity, tuple):
quantity = quantity[0] quantity = quantity[0]
# In case of shared data anylsis only add additional input lines # In case of shared data anylsis only add additional input lines
if is_shared_data_analysis and quantity > -1: if is_shared_data_analysis and quantity > -1:
continue continue
aggregate_set = set() aggregate_set = set()
# manually add device to every line # manually add device to every line
aggregate_set.add(movement.getAggregateDevice()) aggregate_set.add(movement.getAggregateDevice())
if transformation_line.getPortalType() == \
"Data Transformation Resource Line": # If it is batch processing we additionally get items from the other
# at the moment, we only check for positive or negative quantity # batch movements and deliver the other batch movements
if quantity < 0: if transformation_line.getUse() == "big_data/ingestion/batch" and \
# it is an input line transformation_line.getPortalType() == \
# aggregate transformed item from data ingestion batch related to our "Data Transformation Resource Line" and quantity < 0:
# movement. If it is an input resource line, then we search for an batch_relative_url = movement.getAggregateDataIngestionBatch()
# ingestion line with the same resource. If it is an operation line
# then we search for an ingestion line with resource portal type if batch_relative_url is not None:
# Data Product related_movement_list = portal_catalog(
batch_relative_url = movement.getAggregateDataIngestionBatch() portal_type="Data Ingestion Line",
if batch_relative_url is not None: aggregate_relative_url=batch_relative_url,
related_movement_list = portal_catalog( resource_relative_url = resource.getRelativeUrl())
portal_type="Data Ingestion Line",
aggregate_relative_url=batch_relative_url,
resource_relative_url = resource.getRelativeUrl())
else:
# get related movements only from current data ingestion
related_movement_list = movement.getParentValue().searchFolder(
portal_type=["Data Ingestion Line", "Data Analysis Line"],
resource_relative_url = resource.getRelativeUrl())
for related_movement in related_movement_list: for related_movement in related_movement_list:
aggregate_set.update(related_movement.getAggregateSet()) #aggregate_set.update(related_movement.getAggregateSet())
if related_movement.getUse() == "big_data/ingestion/batch": related_movement.getParentValue().deliver()
related_movement.getParentValue().deliver()
# create new item based on item_type if it is not already aggregated
aggregate_type_set = set( # create new item based on item_type if it is not already aggregated
[portal.restrictedTraverse(a).getPortalType() for a in aggregate_set]) aggregate_type_set = set(
for item_type in transformation_line.getAggregatedPortalTypeList(): [portal.restrictedTraverse(a).getPortalType() for a in aggregate_set])
# create item if it does note exist yet. for item_type in transformation_line.getAggregatedPortalTypeList():
# Except if it is a Data Array Line, then it is currently created by # if item is not yet aggregated to this line, search it by related project
# data operation itself (probably this exception is inconsistent) # and source If the item is a data configuration or a device configuration
if item_type not in aggregate_type_set and item_type != "Data Array Line": # then we do not care the workflow state nor the related resource, nor
# the variation nor the related sensor. Data Array Lines are created
# by Data Operation.
if item_type not in aggregate_type_set:
if item_type in portal.getPortalDeviceConfigurationTypeList() + portal.getPortalDataConfigurationTypeList():
if item_type == "Status Configuration":
item = None
else:
item = portal.portal_catalog.getResultValue(
portal_type=item_type,
#validation_state="validated",
item_project_relative_url=delivery.getDestinationProject(),
item_source_relative_url=delivery.getSource())
elif item_type != "Data Array Line":
item = portal.portal_catalog.getResultValue( item = portal.portal_catalog.getResultValue(
portal_type=item_type, portal_type=item_type,
validation_state="validated", validation_state="validated",
...@@ -140,26 +156,21 @@ for movement in portal_catalog(query): ...@@ -140,26 +156,21 @@ for movement in portal_catalog(query):
item_project_relative_url=data_analysis.getDestinationProject(), item_project_relative_url=data_analysis.getDestinationProject(),
item_resource_uid=resource.getUid(), item_resource_uid=resource.getUid(),
item_source_relative_url=data_analysis.getSource()) item_source_relative_url=data_analysis.getSource())
if item is None:
module = portal.getDefaultModule(item_type) #if transformation_line.getRelativeUrl() == "data_transformation_module/woelfel_r0331_statistic_raw":
item = module.newContent(portal_type = item_type, # raise TypeError("JUST STOP")
title = transformation.getTitle(), if item is None:
reference = "%s-%s" %(transformation.getTitle(), module = portal.getDefaultModule(item_type)
delivery.getReference()), item = module.newContent(portal_type = item_type,
version = '001') title = transformation.getTitle(),
try: reference = "%s-%s" %(transformation.getTitle(),
item.validate() delivery.getReference()),
except AttributeError: version = '001')
pass try:
aggregate_set.add(item.getRelativeUrl()) item.validate()
# find other items such as device configuration and data configuration except AttributeError:
# from data ingestion and data supply pass
composed = data_analysis.asComposedDocument() aggregate_set.add(item.getRelativeUrl())
line_list = [l for l in delivery.objectValues(portal_type="Data Ingestion Line")]
line_list += [l for l in composed.objectValues(portal_type="Data Supply Line")]
for line in line_list:
if line.getResourceValue().getPortalType() == "Data Operation":
aggregate_set.update(line.getAggregateList())
data_analysis_line = data_analysis.newContent( data_analysis_line = data_analysis.newContent(
portal_type = "Data Analysis Line", portal_type = "Data Analysis Line",
...@@ -172,12 +183,12 @@ for movement in portal_catalog(query): ...@@ -172,12 +183,12 @@ for movement in portal_catalog(query):
quantity_unit = transformation_line.getQuantityUnit(), quantity_unit = transformation_line.getQuantityUnit(),
use = transformation_line.getUse(), use = transformation_line.getUse(),
aggregate_set = aggregate_set) aggregate_set = aggregate_set)
# for intput lines of first level analysis set causality and specialise # for intput lines of first level analysis set causality and specialise
if quantity < 0 and delivery.getPortalType() == "Data Ingestion": if quantity < 0 and delivery.getPortalType() == "Data Ingestion":
data_analysis_line.edit( data_analysis_line.edit(
causality_value = delivery, causality_value = delivery,
specialise_value_list = data_supply_list) specialise_value_list = data_supply_list)
data_analysis.checkConsistency(fixit=True) data_analysis.checkConsistency(fixit=True)
try: try:
data_analysis.start() data_analysis.start()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment