Commit 67bd4087 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: data sets and data streams have consistent states

- state after ingestion is "validated"
parent 98c8e221
...@@ -30,7 +30,7 @@ portal = context.getPortalObject() ...@@ -30,7 +30,7 @@ portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"] reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
#if last chunk of split ingestion -> validate all related data streams and publish the current one: #if last chunk of split ingestion -> validate all related data streams
if data_stream.getId().endswith(reference_end_split): if data_stream.getId().endswith(reference_end_split):
query = Query(portal_type="Data Stream", reference=data_stream.getReference(), validation_state="draft") query = Query(portal_type="Data Stream", reference=data_stream.getReference(), validation_state="draft")
split_ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),)) split_ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
...@@ -41,4 +41,3 @@ if data_stream.getId().endswith(reference_end_split): ...@@ -41,4 +41,3 @@ if data_stream.getId().endswith(reference_end_split):
chunk_data_stream.validate() chunk_data_stream.validate()
if data_stream.getValidationState() != "validated": if data_stream.getValidationState() != "validated":
data_stream.validate() data_stream.validate()
data_stream.publish()
...@@ -69,8 +69,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -69,8 +69,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
data_stream.setVersion(hash_value) data_stream.setVersion(hash_value)
if data_stream.getValidationState() != "validated" and data_stream.getValidationState() != "published": if data_stream.getValidationState() != "validated" and data_stream.getValidationState() != "published":
data_stream.validate() data_stream.validate()
if data_stream.getValidationState() != "published":
data_stream.publish()
if data_ingestion.getSimulationState() == "started": if data_ingestion.getSimulationState() == "started":
data_ingestion.stop() data_ingestion.stop()
except Exception as e: except Exception as e:
......
...@@ -117,9 +117,8 @@ data_ingestion.start() ...@@ -117,9 +117,8 @@ data_ingestion.start()
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue() data_stream = input_line.getAggregateDataStreamValue()
# if not split (one single ingestion) validate and publish the data stream # if not split (one single ingestion) validate the data stream
if eof == reference_end_single: if eof == reference_end_single:
data_stream.validate() data_stream.validate()
data_stream.publish()
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment