Commit 9b8f8be0 authored by Klaus Wölfel's avatar Klaus Wölfel

Ingestion: Support different types of aggregate data sink

parent 072997f2
......@@ -28,7 +28,9 @@ def init_input_line(input_line, operation_line):
data_ingestion_batch_id = "%s-%s" %(today_string,
data_ingestion_batch_reference)
data_sink = None
data_sink_type_list = []
data_sink_list = []
data_product = None
if data_ingestion_batch_reference is not None:
data_ingestion_batch = portal_catalog.getResultValue(
portal_type = "Data Ingestion Batch",
......@@ -50,25 +52,32 @@ def init_input_line(input_line, operation_line):
aggregate_uid = data_ingestion_batch.getUid())
if previous_data_ingestion_line is not None:
data_sink = previous_data_ingestion_line\
.getAggregateDataSinkValue()
data_product = previous_data_ingestion_line.getResourceValue()
data_sink_type_list = data_product.getAggregatedPortalTypeList()
data_sink_list = previous_data_ingestion_line\
.getAggregateValueList(portal_type=data_sink_type_list)
input_line.setDefaultAggregateValue(data_ingestion_batch)
data_product = portal.portal_catalog.getResultValue(
portal_type = "Data Product",
reference = resource_reference)
data_sink_type = data_product.getAggregatedPortalType()
if data_sink is None:
data_sink = portal.getDefaultModule(data_sink_type).newContent(
portal_type = data_sink_type,
reference = "%s-%s" %(data_ingestion_reference, resource_reference))
data_sink.validate()
if not data_sink_list:
if not data_sink_type_list:
if data_product is None:
data_product = portal.portal_catalog.getResultValue(
portal_type = "Data Product",
reference = resource_reference)
data_sink_type_list = data_product.getAggregatedPortalTypeList()
for data_sink_type in data_sink_type_list:
# This should be more generic
if data_sink_type != "Progress Indicator":
data_sink = portal.getDefaultModule(data_sink_type).newContent(
portal_type = data_sink_type,
reference = "%s-%s" %(data_ingestion_reference, resource_reference))
data_sink.validate()
data_sink_list.append(data_sink)
input_line.setDefaultAggregateValue(data_sink)
input_line.setAggregateValueList(
input_line.getAggregateValueList() + data_sink_list)
input_line.setQuantity(1)
if data_ingestion is None:
......@@ -149,7 +158,8 @@ else:
data_operation = operation_line.getResourceValue()
parameter_dict = {
input_line.getReference(): input_line.getAggregateDataSinkValue(),
input_line.getReference(): \
{v.getPortalType(): v for v in input_line.getAggregateValueList()},
'bucket_reference': movement_dict.get('bucket_reference', None)}
return data_operation, parameter_dict
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment