Commit 024bb516 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: fix handle invalids for split ingestions

parent 3f6ca327
...@@ -37,23 +37,35 @@ now = DateTime() ...@@ -37,23 +37,35 @@ now = DateTime()
now_minus_max = now - 1.0/24/60*9999 now_minus_max = now - 1.0/24/60*9999
now_minus_10 = now - 1.0/24/60*10 now_minus_10 = now - 1.0/24/60*10
catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_10), 'range': 'minmax'}, 'simulation_state': 'planned'} catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_10), 'range': 'minmax'}, 'simulation_state': 'planned', 'portal_type': 'Data Ingestion'}
for data_ingestion in portal_catalog(portal_type="Data Ingestion", **catalog_kw): for data_ingestion in portal_catalog(**catalog_kw):
# invalidate related Data Stream related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
kw_dict = {"portal_type": "Data Stream", simulation_state = "planned",
"reference": data_ingestion.getReference()} reference = data_ingestion.getReference())
for data_stream in portal_catalog(**kw_dict): invalidate = True
if not data_stream.getReference().endswith("_invalid"): for related_ingestion in related_split_ingestions:
data_stream.setReference(data_stream.getReference() + "_invalid") # only invalidate if all related ingestion are old
try: if len(portal_catalog(id=related_ingestion.getId(), **catalog_kw)) == 0:
data_stream.invalidate() context.logEntry("Data Ingestion %s is old, but it has related ingestion that are not. So it won't be invalidated yet." % data_ingestion.getId())
except: invalidate = False
context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated or draft" % data_stream.getId()) break
context.logEntry("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
try: if invalidate:
data_ingestion.setReference(data_ingestion.getReference() + "_invalid") # invalidate related Data Stream
data_ingestion.cancel() kw_dict = {"portal_type": "Data Stream",
except: "reference": data_ingestion.getReference()}
context.logEntry("[WARNING] Could not invalidate/cancel data ingestion '%s', it was already invalidated/cancelled" % data_ingestion.getId()) for data_stream in portal_catalog(**kw_dict):
context.logEntry("%s %s (id:%s) invalidated and cancelled" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId())) if not data_stream.getReference().endswith("_invalid"):
data_stream.setReference(data_stream.getReference() + "_invalid")
try:
data_stream.invalidate()
except:
context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated or draft" % data_stream.getId())
context.logEntry("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
try:
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
data_ingestion.cancel()
except:
context.logEntry("[WARNING] Could not invalidate/cancel data ingestion '%s', it was already invalidated/cancelled" % data_ingestion.getId())
context.logEntry("%s %s (id:%s) invalidated and cancelled" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId()))
...@@ -42,7 +42,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -42,7 +42,6 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
full_data_stream = data_stream full_data_stream = data_stream
else: else:
full_data_stream.appendData(data_stream.getData()) full_data_stream.appendData(data_stream.getData())
#data_stream.update_data("",0)
portal.data_stream_module.deleteContent(data_stream.getId()) portal.data_stream_module.deleteContent(data_stream.getId())
index += 1 index += 1
full_data_stream.validate() full_data_stream.validate()
......
...@@ -125,24 +125,9 @@ try: ...@@ -125,24 +125,9 @@ try:
data_ingestion.plan() data_ingestion.plan()
context.logEntry("Data Ingestion planned.") context.logEntry("Data Ingestion planned.")
#else:
# log("Planned Data Ingestion found for reference: " + str(data_ingestion))
# context.logEntry("Planned Data Ingestion found for reference: " + str(data_ingestion))
# # find ingestion line for current resource
# for line in data_ingestion.objectValues(portal_type="Data Ingestion Line"):
# if line.getResourceReference() == resource_reference:
# input_line = line
# elif line.getResourceValue().getPortalType() == "Data Operation":
# operation_line = line
data_operation = operation_line.getResourceValue() data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue() data_stream = input_line.getAggregateDataStreamValue()
#if eof == "EOF":
# data_ingestion.start()
# log("Data Ingestion started.")
# context.logEntry("Data Ingestion started.")
return data_operation, {'data_stream': data_stream} return data_operation, {'data_stream': data_stream}
except Exception as e: except Exception as e:
context.logEntry("[ERROR] Error during ingestion policy operation: " + str(e)) context.logEntry("[ERROR] Error during ingestion policy operation: " + str(e))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment