Commit 83e6d528 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: new windows ebulk tool and handling of ingestions

- tool new name 'ebulk': both windows and unix scripts updated
- handling of interrupted split ingestions (check reference and delete old pending ingestions)
parent 89b7cf0e
......@@ -31,3 +31,30 @@ for data_array in portal_catalog(**kw_dict):
log("Ivalid Data Array: " + str(data_array.getReference()))
context.logEntry("Ivalid Data Array: " + str(data_array.getReference()))
portal.ERP5Site_invalidateIngestionObjects(data_array.getReference())
# invalidate old (more than 10min) pending ingestions (split ingestions that were canceled/interrumped)
from DateTime import DateTime
now = DateTime()
now_minus_max = now - 1.0/24/60*9999
now_minus_10 = now - 1.0/24/60*10
catalog_kw = {'creation_date': {'query': (now_minus_max, now_minus_10), 'range': 'minmax'}, 'simulation_state': 'planned'}
for data_ingestion in portal_catalog(portal_type="Data Ingestion", **catalog_kw):
# invalidate related Data Stream
kw_dict = {"portal_type": "Data Stream",
"reference": data_ingestion.getReference(),
"validation_state": "Validated"}
for data_stream in portal_catalog(**kw_dict):
if not data_stream.getReference().endswith("_invalid"):
log("%s %s (id:%s) invalidated" % (data_stream.getPortalType(), data_stream.getReference(), data_stream.getId()))
data_stream.setReference(data_stream.getReference() + "_invalid")
try:
data_stream.invalidate()
except:
pass # fails if it's already invalidated
data_ingestion.cancel()
log("%s %s (id:%s) invalidated and canceled" % (data_ingestion.getPortalType(), data_ingestion.getReference(), data_ingestion.getId()))
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
......@@ -14,7 +14,8 @@ try:
context.logEntry("Attempt to do a new ingestion with reference: %s" % reference)
# remove supplier and eof from reference
data_ingestion_reference = '_'.join(reference.split('.')[1:])
data_ingestion_reference = '_'.join(reference.split('.')[1:-1])
EOF = reference.split('.')[-1]
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
......@@ -24,6 +25,12 @@ try:
return FALSE
if data_ingestion.getSimulationState() == 'planned':
try:
if EOF != "EOF" and int(EOF) == 1:
# The user has restarted a split ingestion that is already being processed
return TRUE
except:
pass
return FALSE
context.logEntry("[ERROR] Data Ingestion reference %s already exists" % data_ingestion_reference)
......
......@@ -9,4 +9,5 @@ person_module/**
portal_ingestion_policies/wendelin_embulk
portal_categories/function/**
portal_categories/use/big_data/ingestion/batch_ingestion
data_stream_module/embulk_download_script
\ No newline at end of file
data_stream_module/embulk_download_script
data_stream_module/embulk_download_script_win
\ No newline at end of file
......@@ -9,4 +9,5 @@ person_module/**
portal_ingestion_policies/wendelin_embulk
portal_categories/function/**
portal_categories/use/big_data/ingestion/batch_ingestion
data_stream_module/embulk_download_script
\ No newline at end of file
data_stream_module/embulk_download_script
data_stream_module/embulk_download_script_win
\ No newline at end of file
......@@ -4,6 +4,7 @@ data_product_module/fif_array
data_product_module/fif_data
data_product_module/fif_descriptor
data_stream_module/embulk_download_script
data_stream_module/embulk_download_script_win
data_supply_module/**
data_transformation_module/**
person_module/**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment