Commit 24511e08 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: use linked data streams for split ingestions

- test updated
parent 7beae64a
......@@ -93,6 +93,16 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
ingestion.stop()
else:
ingestion.deliver()
#link split datastreams
related_split_streams = portal_catalog(portal_type = "Data Stream",
reference = data_ingestion.getReference(),
sort_on=[('creation_date', 'ascending')])
predecessor = None
for stream in related_split_streams:
if predecessor:
predecessor.setSuccessorValue(stream)
stream.setPredecessorValue(predecessor)
predecessor = stream
except Exception as e:
context.log("ERROR handling split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.log(e)
......
......@@ -75,7 +75,8 @@ class TestDataIngestion(SecurityTestCase):
def getDataStreamChunkList(self, reference):
data_stream_list = self.portal.portal_catalog(
portal_type = 'Data Stream',
reference = reference)
reference = reference,
sort_on=[('creation_date', 'ascending')])
return data_stream_list
def ingestRequest(self, reference, eof, data_chunk, ingestion_policy):
......@@ -189,6 +190,23 @@ class TestDataIngestion(SecurityTestCase):
#all data streams are validated
self.assertSameSet(['validated' for x in data_stream_list],
[x.getValidationState() for x in data_stream_list])
#data streams are linked
data_stream_1 = data_stream_list[0].getObject()
data_stream_2 = data_stream_list[1].getObject()
data_stream_3 = data_stream_list[2].getObject()
data_stream_4 = data_stream_list[3].getObject()
# test successor
self.assertSameSet(data_stream_2.getRecursiveSuccessorValueList(), \
[data_stream_3, data_stream_4])
self.assertSameSet(data_stream_4.getRecursiveSuccessorValueList(), \
[])
# test predecessor
self.assertSameSet(data_stream_1.getRecursivePredecessorValueList(), \
[])
self.assertSameSet(data_stream_2.getRecursivePredecessorValueList(), \
[data_stream_1])
self.assertSameSet(data_stream_4.getRecursivePredecessorValueList(), \
[data_stream_3, data_stream_2, data_stream_1])
def test_03_DefaultWendelinConfigurationExistency(self):
"""
......
......@@ -46,8 +46,8 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:104, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:104, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:105, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:105, 76: Unused variable \'j\' (unused-variable)</string>
</tuple>
</value>
</item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment