Commit 545ba8c9 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: reduce one step in ingestion cycle (started-stopped-delivered)

parent 54e7a310
from Products.ERP5Type.Log import log
portal = context.getPortalObject()
portal.ERP5Site_stopIngestionList()
portal.ERP5Site_startIngestionList()
portal.ERP5Site_createDataAnalysisList()
portal.ERP5Site_executeDataAnalysisList()
......@@ -10,12 +10,12 @@ now = DateTime()
now_minus_max = now - 1.0/24/60*9999
now_minus_4 = now - 1.0/24/60*4
catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_4), 'range': 'minmax'}, 'simulation_state': 'planned', 'portal_type': 'Data Ingestion'}
catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_4), 'range': 'minmax'}, 'simulation_state': 'started', 'portal_type': 'Data Ingestion'}
for data_ingestion in portal_catalog(**catalog_kw):
# search related data ingestions that are not old yet (less than 10 min)
# search related data ingestions that are not old yet (less than 4 min)
catalog_kw = {'creation_date': {'query': (now_minus_4, DateTime()), 'range': 'minmax'},
'simulation_state': 'planned',
'simulation_state': 'started',
'portal_type': 'Data Ingestion',
'reference': data_ingestion.getReference()}
invalidate = True
......@@ -36,8 +36,9 @@ for data_ingestion in portal_catalog(**catalog_kw):
context.logEntry("[WARNING] Could not invalidate data stream '%s', it was already invalidated or draft" % data_stream.getId())
context.logEntry("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
try:
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
data_ingestion.cancel()
if not data_ingestion.getReference().endswith("_invalid"):
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
data_ingestion.deliver()
except:
context.logEntry("[WARNING] Could not invalidate/cancel data ingestion '%s', it was already invalidated/cancelled" % data_ingestion.getId())
context.logEntry("%s %s (id:%s) invalidated and cancelled" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId()))
context.logEntry("[WARNING] Could not invalidate/deliver data ingestion '%s', it was already invalidated/deliver" % data_ingestion.getId())
context.logEntry("%s %s (id:%s) invalidated and delivered" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId()))
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
# start single planned ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "planned",
id = "%EOF"):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
context.logEntry("Planned EOF ingestion found: " + data_ingestion.getId())
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = data_ingestion.getReference())
if data_stream is not None:
data_stream.validate()
data_ingestion.start()
context.logEntry("Data Ingestion %s started." % data_ingestion.getId())
# append split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "planned",
id = "%001"):
context.logEntry("Planned split ingestion found: " + data_ingestion.getId())
try:
last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference())
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
for data_stream in result_list:
if data_stream.getId() == data_ingestion.getId():
full_data_stream = data_stream
else:
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"):
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
ingestion.start()
else:
ingestion.cancel()
context.logEntry("Chunks of split ingestion where appended into Data Stream %s. Corresponding Data Ingestion started." % full_data_stream.getId())
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s" % data_ingestion.getReference())
context.logEntry(e)
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_startIngestionList</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from DateTime import DateTime
from Products.ERP5Type.DateUtils import addToDate
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
portal_catalog = context.getPortalObject().portal_catalog
# search for dates older than one minute ago
old_start_date = addToDate(DateTime(), {'minute' : -1})
start_date_query = Query(**{'delivery.start_date': old_start_date, 'range': 'ngt'})
kw_dict = {"query": start_date_query,
"portal_type": "Data Ingestion",
"simulation_state": "started"}
parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)]
if len(parent_uid_list) != 0:
log("Stopping %s ingestions..." %(str(len(parent_uid_list))))
context.logEntry("[Called by alarm] Stopping %s ingestions" %(str(len(parent_uid_list))))
kw_dict = {"portal_type": "Data Ingestion Line",
"resource_portal_type": "Data Product",
"parent_uid": parent_uid_list}
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
parent_uid_list = [x.getUid() for x in portal_catalog(**kw_dict)]
# start single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%EOF"):
if not data_ingestion.getReference().endswith("_invalid"):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
context.logEntry("Started EOF ingestion found: " + data_ingestion.getId())
data_stream = portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = data_ingestion.getReference())
if data_stream is not None:
#calculate md5
data_stream.validate()
data_ingestion.stop()
context.logEntry("Data Ingestion %s stopped." % data_ingestion.getId())
for data_ingestion_line in portal_catalog(**kw_dict):
# append split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%001"):
if not data_ingestion.getReference().endswith("_invalid"):
context.logEntry("Started split ingestion found: " + data_ingestion.getId())
try:
data_ingestion = data_ingestion_line.getParentValue()
resource = data_ingestion_line.getResourceValue()
log("data_ingestion: " + str(data_ingestion.getReference()))
log("resource: " + str(resource.getReference()))
if "big_data/ingestion/batch_ingestion" in resource.getUseList():
if data_ingestion.getSimulationState() == 'started':
data_ingestion.setStopDate(DateTime())
data_ingestion.stop()
log("DATA INGESTION STOPPED")
context.logEntry("Data Ingestion '%s' stopped." % data_ingestion.getId())
last_data_stream_id = ""
query = Query(portal_type="Data Stream", reference=data_ingestion.getReference())
result_list = portal_catalog(query=query, sort_on=(('creation_date', 'ascending'),))
for data_stream in result_list:
if data_stream.getId() == data_ingestion.getId():
full_data_stream = data_stream
else:
context.logEntry("[WARNING] Could not stop Data Ingestion '%s' because it is already stopped." % data_ingestion.getId())
else:
raise ValueError("resource.getUseList must be 'big_data/ingestion/batch_ingestion'")
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"):
#calculate md5
full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
for ingestion in related_split_ingestions:
if ingestion.getId() == full_data_stream.getId():
ingestion.stop()
else:
ingestion.setReference(ingestion.getReference() + "_invalid")
ingestion.deliver()
context.logEntry("Chunks of split ingestion where appended into Data Stream %s. Corresponding Data Ingestion stopped." % full_data_stream.getId())
except Exception as e:
context.logEntry("[ERROR] Error stopping Data Ingestion '%s': %s." % (data_ingestion.getId()), str(e))
context.logEntry("ERROR appending split data streams for ingestion: %s" % data_ingestion.getReference())
context.logEntry(e)
......@@ -28,7 +28,7 @@ try:
reference = data_ingestion_reference)
if data_ingestion is not None:
if data_ingestion.getSimulationState() != 'planned':
if data_ingestion.getSimulationState() != 'started':
context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference)
raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference)
......@@ -120,8 +120,8 @@ try:
data_set.validate()
input_line.setDefaultAggregateValue(data_set)
data_ingestion.plan()
context.logEntry("Data Ingestion planned.")
data_ingestion.start()
context.logEntry("Data Ingestion started.")
data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue()
......
......@@ -28,11 +28,13 @@ try:
if data_ingestion is None:
return FALSE
if data_ingestion.getSimulationState() == 'planned':
if data_ingestion.getSimulationState() == 'started':
try:
if EOF != "EOF" and int(EOF) == 1:
# The user has restarted a split ingestion that is already being processed
return TRUE
# The user has restarted an interrupted split ingestion
context.log("[WARNING] User has restarted an interrumpted ingestion for reference %s." % data_ingestion.getReference())
context.log("[WARNING] Previous split ingestions for reference %s will be discarted and full ingestion restarted." % data_ingestion.getReference())
portal.ERP5Site_invalidateIngestionObjects(data_ingestion.getReference())
except:
pass
return FALSE
......
......@@ -118,11 +118,8 @@ class TestDataIngestion(SecurityTestCase):
raise StandardError("Could not find ingestion with reference %s" % reference)
def simulateIngestionAlarm(self, reference, now):
self.portal.ERP5Site_startIngestionList()
self.tic()
self.portal.ERP5Site_stopIngestionList()
self.tic()
self.manuallyStopIngestionWorkaround(reference, now)
self.portal.ERP5Site_createDataAnalysisList()
time.sleep(5)
......@@ -275,7 +272,7 @@ class TestDataIngestion(SecurityTestCase):
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.PART_3, data_chunk_3, ingestion_policy)
time.sleep(1)
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk_4, ingestion_policy)
self.portal.ERP5Site_startIngestionList()
self.portal.ERP5Site_stopIngestionList()
self.tic()
ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference)
data_stream = self.getDataStream(ingestion_reference)
......
......@@ -46,15 +46,15 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:141, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:164, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:164, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:187, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:187, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:203, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:207, 10: No exception type(s) specified (bare-except)</string>
<string>W:215, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:280, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:138, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:161, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:161, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:184, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:184, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:200, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:204, 10: No exception type(s) specified (bare-except)</string>
<string>W:212, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:277, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string>
<string>W: 10, 0: Unused import math (unused-import)</string>
<string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment