Commit aca468c6 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: split files are no longer...

erp5_wendelin_data_lake_ingestion:  split files are no longer appended/processed and no data streams are removed anymore
- all ingestions and data streams corresponding to split parts are kept
- client will receive the list of all data streams and it will be in charge of merging the parts during the download
- validate chunk data streams only when full file was ingested
- only process split ingestions when full file was ingested
- calculate full split file size
- calculate hash and add state control during data stream validation
- stop invalidated ingestions
parent 2d01c3f5
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery from Products.ZSQLCatalog.SQLCatalog import Query
import hashlib import hashlib
CHUNK_SIZE = 200000 CHUNK_SIZE = 200000
...@@ -14,7 +14,7 @@ def getHash(data_stream): ...@@ -14,7 +14,7 @@ def getHash(data_stream):
end_offset = n_chunk*chunk_size+chunk_size end_offset = n_chunk*chunk_size+chunk_size
try: try:
data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset)) data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset))
except: except Exception:
# data stream is empty # data stream is empty
data_stream_chunk = "" data_stream_chunk = ""
hash_md5.update(data_stream_chunk) hash_md5.update(data_stream_chunk)
...@@ -22,9 +22,18 @@ def getHash(data_stream): ...@@ -22,9 +22,18 @@ def getHash(data_stream):
n_chunk += 1 n_chunk += 1
return hash_md5.hexdigest() return hash_md5.hexdigest()
def isFinishedSplitIngestion(reference):
#check if all chunks of a split file were ingested
#that is if EOF chunk was ingested
reference_end_split = portal.ERP5Site_getIngestionReferenceDictionary()["split_end_suffix"]
eof_ingestion = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = reference,
id = "%"+reference_end_split)
return len(eof_ingestion) == 1
def isInterruptedAbandonedSplitIngestion(reference): def isInterruptedAbandonedSplitIngestion(reference):
from DateTime import DateTime from DateTime import DateTime
now = DateTime()
day_hours = 1.0/24/60*60*24 day_hours = 1.0/24/60*60*24
# started split data ingestions for reference # started split data ingestions for reference
catalog_kw = {'portal_type': 'Data Ingestion', catalog_kw = {'portal_type': 'Data Ingestion',
...@@ -52,58 +61,59 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -52,58 +61,59 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion", related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1: if len(related_split_ingestions) == 1:
try:
data_stream = portal_catalog.getResultValue( data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream', portal_type = 'Data Stream',
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
if data_stream is not None: if data_stream is not None:
if data_stream.getVersion() is None:
hash_value = getHash(data_stream) hash_value = getHash(data_stream)
data_stream.setVersion(hash_value) data_stream.setVersion(hash_value)
if data_stream.getValidationState() != "validated": if data_stream.getValidationState() != "validated" and data_stream.getValidationState() != "published":
data_stream.validate() data_stream.validate()
if data_stream.getValidationState() != "published":
data_stream.publish()
if data_ingestion.getSimulationState() == "started": if data_ingestion.getSimulationState() == "started":
data_ingestion.stop() data_ingestion.stop()
except Exception as e:
context.log("ERROR stoping single ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.log(e)
else:
data_ingestion.deliver()
# append split ingestions # handle split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion", for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started", simulation_state = "started",
id = "%"+reference_first_split): id = "%"+reference_first_split):
if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion): if not portal.ERP5Site_checkReferenceInvalidated(data_ingestion):
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()): if isFinishedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
else:
try: try:
last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft") query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft")
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),)) ingestion_data_stream_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
full_data_stream = None full_file_size = 0
for data_stream in result_list: for data_stream in ingestion_data_stream_list:
log(''.join(["Data stream for split ingestion: ", data_stream.getId()])) full_file_size += data_stream.getSize()
if data_stream.getId() == data_ingestion.getId(): hash_value = getHash(data_stream)
log("It is base data stream") data_stream.setVersion(hash_value)
full_data_stream = data_stream if data_stream.getValidationState() != "validated":
else: data_stream.validate()
log("It is not base data stream, it is a part") if data_stream.getId().endswith(reference_end_split):
if full_data_stream != None: if data_stream.getValidationState() != "published":
log("appending content to base data stream...") data_stream.publish()
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId() last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId()) #TODO: set full_file_size for EOF data stream to display the size of the full file
if last_data_stream_id.endswith(reference_end_split):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=True)
hash = getHash(full_data_stream)
full_data_stream.setVersion(hash)
if full_data_stream.getValidationState() != "validated":
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion", related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started", simulation_state = "started",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
for ingestion in related_split_ingestions: for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId(): if ingestion.getId() == last_data_stream_id:
if ingestion.getSimulationState() == "started": if ingestion.getSimulationState() == "started":
ingestion.stop() ingestion.stop()
else: else:
portal.ERP5Site_invalidateReference(ingestion)
ingestion.deliver() ingestion.deliver()
except Exception as e: except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference())) context.log("ERROR handling split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.logEntry(e) context.log(e)
else:
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment