Commit edc668b5 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: split ingestion validation is done right...

erp5_wendelin_data_lake_ingestion: split ingestion validation is done right after last chunk (eof) is ingested instead of waiting for the alarm
- better handling of data stream hash calculation and publication
parent d7747faf
import hashlib
import base64
from Products.ZSQLCatalog.SQLCatalog import Query
CHUNK_SIZE = 200000
def getHash(data_stream):
hash_md5 = hashlib.md5()
data_stream_chunk = None
n_chunk = 0
chunk_size = CHUNK_SIZE
while True:
start_offset = n_chunk*chunk_size
end_offset = n_chunk*chunk_size+chunk_size
try:
data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset))
except Exception:
# data stream is empty
data_stream_chunk = ""
hash_md5.update(data_stream_chunk)
if data_stream_chunk == "": break
n_chunk += 1
return hash_md5.hexdigest()
decoded = base64.b64decode(data_chunk)
data_stream.appendData(decoded)
data_stream.setVersion(getHash(data_stream))
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
#if last chunk of split ingestion -> validate all related data streams and publish the current one:
if data_stream.getId().endswith(reference_end_split):
query = Query(portal_type="Data Stream", reference=data_stream.getReference(), validation_state="draft")
split_ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
#full_file_size = 0
for chunk_data_stream in split_ingestion_data_stream_list:
#full_file_size += chunk_data_stream.getSize()
if chunk_data_stream.getValidationState() != "validated":
chunk_data_stream.validate()
if data_stream.getValidationState() != "validated":
data_stream.validate()
data_stream.publish()
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query
import hashlib
CHUNK_SIZE = 200000
......@@ -88,25 +86,11 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
if isFinishedSplitIngestion(data_ingestion.getReference()):
try:
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft")
ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
full_file_size = 0
for data_stream in ingestion_data_stream_list:
full_file_size += data_stream.getSize()
hash_value = getHash(data_stream)
data_stream.setVersion(hash_value)
if data_stream.getValidationState() != "validated":
data_stream.validate()
if data_stream.getId().endswith(reference_end_split):
if data_stream.getValidationState() != "published":
data_stream.publish()
last_data_stream_id = data_stream.getId()
#TODO: set full_file_size for EOF data stream to display the size of the full file
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == last_data_stream_id:
if ingestion.getId().endswith(reference_end_split):
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
......
......@@ -20,8 +20,6 @@ supplier = movement_dict.get('supplier', None)
extension = movement_dict.get('extension', None)
dataset_reference = movement_dict.get('dataset_reference', None)
data_ingestion_id = '%s_%s_%s_%s' %(supplier, dataset_reference, now_string, eof)
size = movement_dict.get('size', None) if movement_dict.get('size', None) != "" else None
hash_value = movement_dict.get('hash', None) if movement_dict.get('hash', None) != "" else None
# search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue(
......@@ -85,12 +83,9 @@ for supply_line in composed.objectValues(portal_type = 'Data Supply Line'):
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
if hash_value is None or eof != reference_end_single: # do not set hash if split, calculate when append
hash_value = ""
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
id = data_ingestion_id,
version = hash_value,
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != none_extension else ""),
reference = data_ingestion_reference)
......@@ -109,7 +104,7 @@ if dataset_reference is not None:
# when a data set is uploaded from ebulk this means that "validation" is done at client side
# thus set set accordingly
data_set.validate()
except:
except Exception:
data_set = portal.data_set_module.get(dataset_reference)
if portal.ERP5Site_checkReferenceInvalidated(data_set):
portal.ERP5Site_revalidateReference(data_set)
......@@ -122,7 +117,9 @@ data_ingestion.start()
data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue()
# if not split (one single ingestion) validate and publish the data stream
if eof == reference_end_single:
data_stream.validate()
data_stream.publish()
return data_operation, {'data_stream': data_stream}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment