Commit e5b3546e authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: fixes in ingestion process

- postpone data transformation when loaded server
- invalidate old interrupt split ingestions during stopIngestion script
- minor fix in data set increase version
parent 985884a3
...@@ -35,7 +35,7 @@ for line_data_ingestion in portal_catalog(**query_dict): ...@@ -35,7 +35,7 @@ for line_data_ingestion in portal_catalog(**query_dict):
destination_section = data_ingestion.getDestinationSection(), destination_section = data_ingestion.getDestinationSection(),
destination_project = data_ingestion.getDestinationProject()) destination_project = data_ingestion.getDestinationProject())
except Exception as e: except Exception as e:
log(''.join(["[WARNING] Exception creating Data Analysis (already created?): ", str(e)])) log(''.join(["[WARNING] Data Analysis already created: ", str(e)]))
data_analysis = None data_analysis = None
if data_analysis is not None: if data_analysis is not None:
......
from Products.ERP5Type.Log import log
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
complex_files = ["fif", "nii", ".nii/gz"]
for data_analysis in portal_catalog(portal_type = "Data Analysis", for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"): simulation_state = "planned"):
try: try:
if data_analysis.getSimulationState() == "planned": if data_analysis.getSimulationState() == "planned":
data_analysis.start() process = True
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\ complex_file = False
.DataAnalysis_executeDataOperation() for ext in complex_files:
if data_analysis.getReference().endswith(ext):
complex_file = True
if complex_file:
# if server is bussy and file to process is complex, leave for next alarm
if portal.portal_activities.countMessage() > 100:
log("There are more than 100 activities running, so leaving data processing of file '%s' for next alarm" % data_analysis.getReference())
process = False
if process:
data_analysis.start()
data_analysis.activate(serialization_tag=str(data_analysis.getUid()))\
.DataAnalysis_executeDataOperation()
except Exception as e: except Exception as e:
context.logEntry("[ERROR] Error executing Data Analysis for '%s': %s" % (data_analysis.getId()), str(e)) context.logEntry("[ERROR] Error executing Data Analysis for '%s': %s" % (data_analysis.getId(), str(e)))
...@@ -22,6 +22,21 @@ def getHash(data_stream): ...@@ -22,6 +22,21 @@ def getHash(data_stream):
n_chunk += 1 n_chunk += 1
return hash_md5.hexdigest() return hash_md5.hexdigest()
def isInterruptedAbandonedSplitIngestion(reference):
from DateTime import DateTime
now = DateTime()
five_hours = 1.0/24/60*60*5
# started split data ingestions for reference
catalog_kw = {'portal_type': 'Data Ingestion',
'simulation_state': 'started',
'reference': reference}
invalidate = True
for data_ingestion in portal_catalog(**catalog_kw):
# check that all related ingestions are old (more than 5 hours)
if (DateTime() - data_ingestion.getCreationDate()) < five_hours:
invalidate = False
return invalidate
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
...@@ -49,39 +64,42 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -49,39 +64,42 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started", simulation_state = "started",
id = "%001"): id = "%001"):
if not data_ingestion.getReference().endswith("_invalid"): if not data_ingestion.getReference().endswith("_invalid"):
try: if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
last_data_stream_id = "" portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft") else:
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),)) try:
full_data_stream = None last_data_stream_id = ""
for data_stream in result_list: query = Query(portal_type="Data Stream", reference=data_ingestion.getReference(), validation_state="draft")
log(''.join(["Data stream for split ingestion: ", data_stream.getId()])) result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
if data_stream.getId() == data_ingestion.getId(): full_data_stream = None
log("It is base data stream (001)") for data_stream in result_list:
full_data_stream = data_stream log(''.join(["Data stream for split ingestion: ", data_stream.getId()]))
else: if data_stream.getId() == data_ingestion.getId():
log("It is not base data stream, it is a part (!=001)") log("It is base data stream (001)")
if full_data_stream != None: full_data_stream = data_stream
log("appending content to base data stream...")
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=True)
hash = getHash(full_data_stream)
full_data_stream.setVersion(hash)
if full_data_stream.getValidationState() != "validated":
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
if ingestion.getSimulationState() == "started":
ingestion.stop()
else: else:
ingestion.setReference(ingestion.getReference() + "_invalid") log("It is not base data stream, it is a part (!=001)")
ingestion.deliver() if full_data_stream != None:
except Exception as e: log("appending content to base data stream...")
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference())) full_data_stream.appendData(data_stream.getData())
context.logEntry(e) last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=True)
hash = getHash(full_data_stream)
full_data_stream.setVersion(hash)
if full_data_stream.getValidationState() != "validated":
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
ingestion.setReference(ingestion.getReference() + "_invalid")
ingestion.deliver()
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
context.logEntry(e)
...@@ -103,7 +103,7 @@ try: ...@@ -103,7 +103,7 @@ try:
reference = dataset_reference, reference = dataset_reference,
id = dataset_reference, id = dataset_reference,
description = "Default description.", description = "Default description.",
version = "001" version = "000"
) )
data_set.validate() data_set.validate()
except: except:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment