Commit 7139eaf0 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: refactoring for ingestion reference utils

parent a5f52a27
reference_extension = input_stream_data.getReference().split("/")[-1]
portal = context.getPortalObject()
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_extension = input_stream_data.getReference().split(reference_separator)[-1]
result = str(context.processRawData(input_stream_data, output_array, output_descriptor, reference_extension))
record = reference.rsplit('/')
portal = context.getPortalObject()
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_length = portal.getIngestionReferenceDictionary()["reference_length"]
invalid_chars = portal.getIngestionReferenceDictionary()["invalid_chars"]
record = reference.rsplit(reference_separator)
length = len(record)
if (length < 7):
if (length < reference_length):
context.logEntry("[ERROR] In HandleFifEmbulkIngestion: Data Ingestion reference is not well formated")
raise ValueError("Data Ingestion reference is not well formated.")
invalid_chars = ["&", ";", "#", "%", '"', "+"]
for char in invalid_chars:
if char in reference:
context.logEntry("[ERROR] In HandleFifEmbulkIngestion: Data Ingestion reference contains chars that are not allowed")
......@@ -14,7 +18,7 @@ for char in invalid_chars:
supplier = record[0]
dataset_reference = record[1]
filename = '/'.join(record[2:-4])
filename = reference_separator.join(record[2:-4])
extension = record[length-4]
eof = record[length-3]
size = record[length-2]
......
......@@ -2,6 +2,9 @@ return {"invalid_suffix":"_invalid",
"split_end_suffix":"EOF",
"single_end_suffix":"END",
"split_first_suffix":"001",
"none_extension":"none",
"reference_separator":"/",
"complex_files_extensions":["fif", "nii", ".nii/gz"],
"reference_length":7,
"invalid_chars":["&", ";", "#", "%", '"', "+"],
}
from Products.ERP5Type.Log import log
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
complex_files = ["fif", "nii", ".nii/gz"]
complex_files = portal.getIngestionReferenceDictionary()["complex_files_extensions"]
for data_analysis in portal_catalog(portal_type = "Data Analysis",
simulation_state = "planned"):
......@@ -14,8 +14,8 @@ for data_analysis in portal_catalog(portal_type = "Data Analysis",
complex_file = True
if complex_file:
# if server is bussy and file to process is complex, leave for next alarm
if portal.portal_activities.countMessage() > 100:
log("There are more than 100 activities running, so leaving data processing of file '%s' for next alarm" % data_analysis.getReference())
if portal.portal_activities.countMessage() > 50:
log("There are more than 50 activities running, so leaving data processing of file '%s' for next alarm" % data_analysis.getReference())
process = False
if process:
data_analysis.start()
......
......@@ -14,34 +14,34 @@ try:
data_ingestion = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
id = data_stream.getId())
data_stream.setReference(data_stream.getReference() + "_invalid")
portal.InvalidateReference(data_stream)
data_stream.invalidate()
if not data_ingestion.getReference().endswith("_invalid"):
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
if not portal.IsReferenceInvalidated(data_ingestion):
portal.InvalidateReference(data_ingestion)
data_an = portal_catalog.getResultValue(
portal_type = 'Data Analysis',
id = data_stream.getId())
if data_an != None:
data_an.setReference(data_an.getReference() + "_invalid")
portal.InvalidateReference(data_an)
data_array = portal_catalog.getResultValue(
portal_type = 'Data Array',
id = data_stream.getId())
if data_array != None:
data_array.setReference(data_array.getReference() + "_invalid")
portal.InvalidateReference(data_array)
data_array.invalidate()
else: # split ingestion interrumped and restarted
# invalidate draft datastreams and old started data ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
reference = reference):
if not data_ingestion.getReference().endswith("_invalid"):
data_ingestion.setReference(data_ingestion.getReference() + "_invalid")
if not portal.IsReferenceInvalidated(data_ingestion):
portal.InvalidateReference(data_ingestion)
data_ingestion.deliver()
for data_stream in portal_catalog(portal_type = "Data Stream",
validation_state = "draft",
reference = reference):
if not data_stream.getReference().endswith("_invalid"):
data_stream.setReference(data_stream.getReference() + "_invalid")
if not portal.IsReferenceInvalidated(data_stream):
portal.InvalidateReference(data_stream)
except Exception as e:
context.logEntry("ERROR in ERP5Site_invalidateSplitIngestions: " + str(e))
pass
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
none_extension = portal.getIngestionReferenceDictionary()["none_extension"]
# check new reference
data_ingestions = portal_catalog(portal_type = "Data Ingestion", reference = new_reference)
......@@ -8,13 +10,13 @@ if len(data_ingestions) > 0: raise "Error renaming: new reference '%s' already e
# rename data ingestions
data_ingestions = portal_catalog(portal_type = "Data Ingestion", reference = reference)
if len(data_ingestions) == 0: raise "Error renaming: could not find any data ingestion with reference '%s'." % reference
data_ingestion_title = '/'.join(new_reference.split('/')[1:-1])
data_ingestion_title = reference_separator.join(new_reference.split(reference_separator)[1:-1])
for data_ingestion in data_ingestions:
data_ingestion.setReference(new_reference)
data_ingestion.setTitle(data_ingestion_title)
extension = new_reference.split('/')[-1]
data_stream_title = "%s%s" % (data_ingestion_title, "."+extension if extension != "none" else "")
extension = new_reference.split(reference_separator)[-1]
data_stream_title = "%s%s" % (data_ingestion_title, "."+extension if extension != none_extension else "")
# rename data streams
data_streams = portal_catalog(portal_type = "Data Stream", reference = reference)
for data_stream in data_streams:
......
......@@ -40,11 +40,15 @@ def isInterruptedAbandonedSplitIngestion(reference):
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_first_split = portal.getIngestionReferenceDictionary()["split_first_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
# stop single started ingestion (not split files)
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%END"):
if not data_ingestion.getReference().endswith("_invalid"):
id = "%"+reference_end_single):
if not portal.IsReferenceInvalidated(data_ingestion):
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference())
if len(related_split_ingestions) == 1:
......@@ -62,8 +66,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
# append split ingestions
for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
simulation_state = "started",
id = "%001"):
if not data_ingestion.getReference().endswith("_invalid"):
id = "%"+reference_first_split):
if not portal.IsReferenceInvalidated(data_ingestion):
if isInterruptedAbandonedSplitIngestion(data_ingestion.getReference()):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=False)
else:
......@@ -75,16 +79,16 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
for data_stream in result_list:
log(''.join(["Data stream for split ingestion: ", data_stream.getId()]))
if data_stream.getId() == data_ingestion.getId():
log("It is base data stream (001)")
log("It is base data stream")
full_data_stream = data_stream
else:
log("It is not base data stream, it is a part (!=001)")
log("It is not base data stream, it is a part")
if full_data_stream != None:
log("appending content to base data stream...")
full_data_stream.appendData(data_stream.getData())
last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"):
if last_data_stream_id.endswith(reference_end_split):
portal.ERP5Site_invalidateSplitIngestions(data_ingestion.getReference(), success=True)
hash = getHash(full_data_stream)
full_data_stream.setVersion(hash)
......@@ -98,7 +102,7 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
if ingestion.getSimulationState() == "started":
ingestion.stop()
else:
ingestion.setReference(ingestion.getReference() + "_invalid")
portal.InvalidateReference(ingestion)
ingestion.deliver()
except Exception as e:
context.logEntry("ERROR appending split data streams for ingestion: %s - reference: %s." % (data_ingestion.getId(), data_ingestion.getReference()))
......
......@@ -6,12 +6,16 @@ now_string = now.strftime('%Y%m%d-%H%M%S-%f')[:-3]
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
none_extension = portal.getIngestionReferenceDictionary()["none_extension"]
try:
# remove supplier, eof, size and hash from reference
reference = '/'.join(reference.split('/')[1:-3])
reference = reference_separator.join(reference.split(reference_separator)[1:-3])
data_ingestion_reference = reference
eof = movement_dict.get('eof', 'END') if movement_dict.get('eof', 'END') != "" else 'END'
eof = movement_dict.get('eof', reference_end_single) if movement_dict.get('eof', reference_end_single) != "" else reference_end_single
resource_reference = movement_dict.get('resource_reference', None)
supplier = movement_dict.get('supplier', None)
extension = movement_dict.get('extension', None)
......@@ -27,7 +31,7 @@ try:
if data_ingestion is not None:
if data_ingestion.getSimulationState() != 'started':
if eof == "END": # if not split (one single ingestion), invalidate old ingestion
if eof == reference_end_single: # if not split (one single ingestion), invalidate old ingestion
portal.ERP5Site_invalidateIngestionObjects(data_ingestion_reference)
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
......@@ -82,13 +86,13 @@ try:
input_line.setAggregateSet(
input_line.getAggregateList() + operation_line.getAggregateList())
if hash_value is None or eof != "END": # do not set hash if split, calculate when append
if hash_value is None or eof != reference_end_single: # do not set hash if split, calculate when append
hash_value = ""
data_stream = portal.data_stream_module.newContent(
portal_type = "Data Stream",
id = data_ingestion_id,
version = hash_value,
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != "none" else ""),
title = "%s%s" % (data_ingestion.getTitle(), "."+extension if extension != none_extension else ""),
reference = data_ingestion_reference)
input_line.setDefaultAggregateValue(data_stream)
......@@ -108,8 +112,8 @@ try:
data_set.validate()
except:
data_set = portal.data_set_module.get(dataset_reference)
if data_set.getReference().endswith("_invalid"):
data_set.setReference(data_set.getReference().replace("_invalid", ""))
if portal.IsReferenceInvalidated(data_set):
portal.RevalidateReference(data_set)
if data_set.getValidationState() == "invalidated":
data_set.validate()
input_line.setDefaultAggregateValue(data_set)
......@@ -119,7 +123,7 @@ try:
data_operation = operation_line.getResourceValue()
data_stream = input_line.getAggregateDataStreamValue()
if eof == "END":
if eof == reference_end_single:
data_stream.validate()
return data_operation, {'data_stream': data_stream}
......
......@@ -6,9 +6,11 @@ from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery, ComplexQuery
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
try:
data_set = portal.data_set_module.get(data_set_reference)
if data_set is None or data_set.getReference().endswith("_invalid"):
if data_set is None or portal.IsReferenceInvalidated(data_set):
return { "status_code": 0, "result": [] }
except Exception as e: # fails because unauthorized access
log("Unauthorized access to getDataStreamList.")
......@@ -20,7 +22,7 @@ if data_set is None:
query_dict = {
"portal_type": "Data Stream",
"reference": data_set.getReference() + "/%",
"reference": data_set.getReference() + reference_separator + "%",
"validation_state": 'validated'}
data_stream_list = []
......
......@@ -10,11 +10,15 @@ TRUE = "TRUE"
portal = context.getPortalObject()
portal_catalog = portal.portal_catalog
reference_separator = portal.getIngestionReferenceDictionary()["reference_separator"]
reference_end_single = portal.getIngestionReferenceDictionary()["single_end_suffix"]
reference_end_split = portal.getIngestionReferenceDictionary()["split_end_suffix"]
try:
# remove supplier and eof from reference
data_ingestion_reference = '/'.join(reference.split('/')[1:-3])
EOF = reference.split('/')[-3]
size = reference.split('/')[-2]
data_ingestion_reference = reference_separator.join(reference.split(reference_separator)[1:-3])
EOF = reference.split(reference_separator)[-3]
size = reference.split(reference_separator)[-2]
if data_ingestion_reference is "":
context.logEntry("[ERROR] Data Ingestion reference parameter for ingestionReferenceExists script is not well formated")
......@@ -28,12 +32,12 @@ try:
if data_ingestion != None:
try:
# check if user tries to restart the previous split ingestion
if (EOF == "" or EOF == "END") or (EOF != "EOF" and int(EOF) == 1):
if (EOF == "" or EOF == reference_end_single) or (EOF != reference_end_split and int(EOF) == 1):
# check if existing split ingestion is still being processed or if it is interrumped
data_ingestion_eof = portal_catalog.getResultValue(
portal_type = 'Data Ingestion',
reference = data_ingestion_reference,
id = "%EOF")
id = "%" + reference_end_split)
if data_ingestion_eof:
# reference exists: previous split ingestion is still being processed
return TRUE
......
......@@ -15,23 +15,24 @@ import hashlib
class TestDataIngestion(SecurityTestCase):
PART_1 = "/001"
PART_2 = "/002"
PART_3 = "/003"
EOF = "/EOF"
FIF = "/fif"
TXT = "/txt"
CSV = "/csv"
TSV = "/tsv"
GZ = "/gz"
NII = "/nii"
SIZE_HASH = "/fake-size/fake-hash"
SINGLE_INGESTION_END = "/"
RANDOM = "/" + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)])
REFERENCE_SEPARATOR = "/"
PART_1 = REFERENCE_SEPARATOR + "001"
PART_2 = REFERENCE_SEPARATOR + "002"
PART_3 = REFERENCE_SEPARATOR + "003"
EOF = REFERENCE_SEPARATOR + "EOF"
FIF = REFERENCE_SEPARATOR + "fif"
TXT = REFERENCE_SEPARATOR + "txt"
CSV = REFERENCE_SEPARATOR + "csv"
TSV = REFERENCE_SEPARATOR + "tsv"
GZ = REFERENCE_SEPARATOR + "gz"
NII = REFERENCE_SEPARATOR + "nii"
SIZE_HASH = REFERENCE_SEPARATOR + "fake-size"+ REFERENCE_SEPARATOR + "fake-hash"
SINGLE_INGESTION_END = REFERENCE_SEPARATOR
RANDOM = REFERENCE_SEPARATOR + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)])
CHUNK_SIZE_TXT = 50000
CHUNK_SIZE_CSV = 25
REF_PREFIX = "fake-supplier/fake-dataset/"
REF_SUPPLIER_PREFIX = "fake-supplier/"
REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR + "fake-dataset" + REFERENCE_SEPARATOR
REF_SUPPLIER_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
INGESTION_SCRIPT = 'HandleFifEmbulkIngestion'
USER = 'zope'
PASS = 'roque5'
......@@ -49,6 +50,11 @@ class TestDataIngestion(SecurityTestCase):
def afterSetUp(self):
self.context = self.portal.UnitTest_getContext()
self.assertEqual(self.REFERENCE_SEPARATOR, self.portal.getIngestionReferenceDictionary()["reference_separator"])
self.assertEqual(self.INVALID, self.portal.getIngestionReferenceDictionary()["invalid_suffix"])
self.assertEqual(self.EOF, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_end_suffix"])
self.assertEqual(self.SINGLE_INGESTION_END, self.REFERENCE_SEPARATOR)
self.assertEqual(self.PART_1, self.REFERENCE_SEPARATOR + self.portal.getIngestionReferenceDictionary()["split_first_suffix"])
def getRandomReference(self):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
......@@ -58,13 +64,13 @@ class TestDataIngestion(SecurityTestCase):
return self.REF_PREFIX + reference + extension
def sanitizeReference(self, reference):
ingestion_reference = '/'.join(reference.split('/')[1:])
ingestion_reference = self.REFERENCE_SEPARATOR.join(reference.split(self.REFERENCE_SEPARATOR)[1:])
data_stream = self.getDataStream(ingestion_reference)
ingestion_id = data_stream.getId()
return ingestion_id, ingestion_reference
def getFullReference(self, ingestion_reference, size, hash_value):
return self.REF_SUPPLIER_PREFIX + ingestion_reference + "//" + str("") + "/" + ""
return self.REF_SUPPLIER_PREFIX + ingestion_reference + self.REFERENCE_SEPARATOR + self.REFERENCE_SEPARATOR + str("") + self.REFERENCE_SEPARATOR + ""
def chunks(self, l, n):
for i in xrange(0, len(l), n):
......@@ -323,7 +329,7 @@ class TestDataIngestion(SecurityTestCase):
def test_nii_gz_data_ingestion(self):
self.perform_nii_test(gz=True)
def test_data_ingestion_splitted_file(self):
def test_data_ingestion_split_file(self):
reference = self.getRandomReference()
ingestion_policy = self.getIngestionPolicy(reference, self.INGESTION_SCRIPT)
data_chunk_1 = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(250)])
......
......@@ -46,15 +46,15 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:175, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:228, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:228, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:251, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:251, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:267, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:271, 10: No exception type(s) specified (bare-except)</string>
<string>W:279, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:344, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:181, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:234, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:234, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:257, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:257, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:273, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:277, 10: No exception type(s) specified (bare-except)</string>
<string>W:285, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:350, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string>
<string>W: 10, 0: Unused import math (unused-import)</string>
<string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment