Commit 1fcc9d18 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: split ingestion are partially appended by alarm

parent 471a61d4
......@@ -4,14 +4,14 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
# start single planned ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "planned",
id = "%EOF"):
context.logEntry("Planned EOF ingestion found: " + data_ingestion.getId())
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
simulation_state = "planned",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
context.logEntry("Planned EOF ingestion found: " + data_ingestion.getId())
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = data_ingestion.getReference())
......@@ -19,25 +19,33 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
data_stream.validate()
data_ingestion.start()
context.logEntry("Data Ingestion %s started." % data_ingestion.getId())
elif len(related_split_ingestions) > 1:
try:
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference())
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
index = 1
for data_stream in result_list:
if index == 1:
full_data_stream = data_stream
else:
full_data_stream.appendData(data_stream.getData())
portal.data_stream_module.deleteContent(data_stream.getId())
index += 1
# append split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "planned",
id = "%001"):
context.logEntry("Planned split ingestion found: " + data_ingestion.getId())
try:
last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference())
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
for data_stream in result_list:
if data_stream.getId() == data_ingestion.getId():
full_data_stream = data_stream
else:
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"):
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
ingestion.start()
else:
ingestion.cancel()
context.logEntry("Chunks of split ingestion where appended into Data Stream %s. Corresponding Data Ingestion started." % full_data_stream.getId())
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s" % data_ingestion.getReference())
context.logEntry(e)
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s" % data_ingestion.getReference())
context.logEntry(e)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment