Commit 891e2bb3 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: calculat hash of ingested files

- ingestion reference now receives size and hash of file
parent 545ba8c9
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
context.logEntry("[NEW INGESTION]") context.logEntry("[NEW INGESTION]")
context.logEntry("Reference: %s" % reference) context.logEntry("Reference received: %s" % reference)
record = reference.rsplit('/') record = reference.rsplit('/')
length = len(record) length = len(record)
specialise_reference = record[0] if (length >= 5) else "default"
aggregate_data_set_reference = record[1] if (length >= 5) else "default" if (length < 7):
filename = '/'.join(record[2:-2]) if (length >= 5) else reference context.logEntry("[ERROR] Data Ingestion reference is not well formated")
extension = record[length-2] if (length >= 5) else "fif" raise ValueError("Data Ingestion reference is not well formated.")
eof = record[length-1] if (length >= 5) else "eof"
supplier = record[0]
dataset_reference = record[1]
filename = '/'.join(record[2:-4])
extension = record[length-4]
eof = record[length-3]
size = record[length-2]
hash = record[length-1]
dict = { 'filename': filename, dict = { 'filename': filename,
'extension': extension, 'extension': extension,
'eof': eof, 'eof': eof,
'specialise_reference': specialise_reference, 'supplier': supplier,
'aggregate_data_set_reference': aggregate_data_set_reference, 'dataset_reference': dataset_reference,
'resource_reference': 'fif' 'resource_reference': 'fif',
'size': size,
'hash': hash
} }
log("Returning dictionary: %s." % dict) log("Returning dictionary: %s." % dict)
......
...@@ -58,7 +58,7 @@ ...@@ -58,7 +58,7 @@
</item> </item>
<item> <item>
<key> <string>description</string> </key> <key> <string>description</string> </key>
<value> <string>Handles ingestion of fif images bytes sent to us from embulk.</string> </value> <value> <string>Handles ingestion of raw files bytes sent to us from embulk.</string> </value>
</item> </item>
<item> <item>
<key> <string>id</string> </key> <key> <string>id</string> </key>
...@@ -146,7 +146,7 @@ ...@@ -146,7 +146,7 @@
</item> </item>
<item> <item>
<key> <string>serial</string> </key> <key> <string>serial</string> </key>
<value> <string>960.42903.37994.56780</string> </value> <value> <string>968.22837.23288.17561</string> </value>
</item> </item>
<item> <item>
<key> <string>state</string> </key> <key> <string>state</string> </key>
...@@ -164,7 +164,7 @@ ...@@ -164,7 +164,7 @@
</tuple> </tuple>
<state> <state>
<tuple> <tuple>
<float>1500296433.88</float> <float>1536668328.08</float>
<string>UTC</string> <string>UTC</string>
</tuple> </tuple>
</state> </state>
......
from Products.ERP5Type.Log import log from Products.ERP5Type.Log import log
from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery from Products.ZSQLCatalog.SQLCatalog import Query, SimpleQuery
import hashlib
CHUNK_SIZE = 200000
def getHash(data_stream):
hash_md5 = hashlib.md5()
data_stream_chunk = None
n_chunk = 0
chunk_size = CHUNK_SIZE
while True:
start_offset = n_chunk*chunk_size
end_offset = n_chunk*chunk_size+chunk_size
try:
data_stream_chunk = ''.join(data_stream.readChunkList(start_offset, end_offset))
except:
# data stream is empty
data_stream_chunk = ""
hash_md5.update(data_stream_chunk)
if data_stream_chunk == "": break
n_chunk += 1
return hash_md5.hexdigest()
portal = context.getPortalObject() portal = context.getPortalObject()
portal_catalog = portal.portal_catalog portal_catalog = portal.portal_catalog
...@@ -17,7 +38,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -17,7 +38,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
portal_type = 'Data Stream', portal_type = 'Data Stream',
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
if data_stream is not None: if data_stream is not None:
#calculate md5 hash_value = getHash(data_stream)
data_stream.setVersion(hash_value)
data_stream.validate() data_stream.validate()
data_ingestion.stop() data_ingestion.stop()
context.logEntry("Data Ingestion %s stopped." % data_ingestion.getId()) context.logEntry("Data Ingestion %s stopped." % data_ingestion.getId())
...@@ -40,7 +62,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion", ...@@ -40,7 +62,8 @@ for data_ingestion in portal_catalog(portal_type = "Data Ingestion",
last_data_stream_id = data_stream.getId() last_data_stream_id = data_stream.getId()
portal.data_stream_module.deleteContent(data_stream.getId()) portal.data_stream_module.deleteContent(data_stream.getId())
if last_data_stream_id.endswith("EOF"): if last_data_stream_id.endswith("EOF"):
#calculate md5 hash = getHash(full_data_stream)
full_data_stream.setVersion(hash)
full_data_stream.validate() full_data_stream.validate()
related_split_ingestions = portal_catalog(portal_type = "Data Ingestion", related_split_ingestions = portal_catalog(portal_type = "Data Ingestion",
reference = data_ingestion.getReference()) reference = data_ingestion.getReference())
......
...@@ -9,18 +9,20 @@ portal_catalog = portal.portal_catalog ...@@ -9,18 +9,20 @@ portal_catalog = portal.portal_catalog
try: try:
# remove supplier and eof from reference # remove supplier, eof, size and hash from reference
reference = '/'.join(reference.split('/')[1:-1]) reference = '/'.join(reference.split('/')[1:-3])
context.logEntry("Data Ingestion reference: %s" % reference) context.logEntry("Data Ingestion reference: %s" % reference)
data_ingestion_reference = reference data_ingestion_reference = reference
eof = movement_dict.get('eof', 'EOF') eof = movement_dict.get('eof', 'EOF')
resource_reference = movement_dict.get('resource_reference', None) resource_reference = movement_dict.get('resource_reference', None)
specialise_reference = movement_dict.get('specialise_reference', None) supplier = movement_dict.get('supplier', None)
extension = movement_dict.get('extension', None) extension = movement_dict.get('extension', None)
dataset_reference = movement_dict.get('aggregate_data_set_reference', None) dataset_reference = movement_dict.get('dataset_reference', None)
data_ingestion_id = '%s_%s_%s_%s' %(specialise_reference, data_ingestion_reference.replace("/","_").replace(" ","_"), now_string, eof) data_ingestion_id = '%s_%s_%s_%s' %(supplier, data_ingestion_reference.replace("/","_").replace(" ","_"), now_string, eof)
context.logEntry("Data Ingestion ID: %s" % reference) context.logEntry("Data Ingestion ID: %s" % reference)
size = movement_dict.get('size', None) if movement_dict.get('size', None) != "" else None
hash_value = movement_dict.get('hash', None) if movement_dict.get('hash', None) != "" else None
# search for applicable data ingestion # search for applicable data ingestion
data_ingestion = portal_catalog.getResultValue( data_ingestion = portal_catalog.getResultValue(
...@@ -29,8 +31,12 @@ try: ...@@ -29,8 +31,12 @@ try:
if data_ingestion is not None: if data_ingestion is not None:
if data_ingestion.getSimulationState() != 'started': if data_ingestion.getSimulationState() != 'started':
# CHECK HASH
#if hash is equal:
context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference) context.logEntry("An older ingestion for reference %s was already done." % data_ingestion_reference)
raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference) raise ValueError("An older ingestion for reference %s was already done." % data_ingestion_reference)
#else:
#portal.ERP5Site_invalidateIngestionObjects(data_ingestion.getReference())
#if data_ingestion is None: #if data_ingestion is None:
specialise_value_list = [x.getObject() for x in portal_catalog.searchResults( specialise_value_list = [x.getObject() for x in portal_catalog.searchResults(
......
...@@ -36,6 +36,9 @@ for stream in portal_catalog(**query_dict): ...@@ -36,6 +36,9 @@ for stream in portal_catalog(**query_dict):
"reference": stream.getReference()} "reference": stream.getReference()}
ingestions = portal_catalog(**ing_dict) ingestions = portal_catalog(**ing_dict)
if len(ingestions) == 1: if len(ingestions) == 1:
data_stream_list.append(["data_stream_module/"+stream.getId(), stream.getReference()]) data_stream_list.append({ "id": "data_stream_module/"+stream.getId(),
"reference": stream.getReference(),
"size": stream.getSize(),
"hash": stream.getVersion() })
return { "status_code": 0, "result": data_stream_list } return { "status_code": 0, "result": data_stream_list }
...@@ -14,8 +14,8 @@ try: ...@@ -14,8 +14,8 @@ try:
context.logEntry("Attempt to do a new ingestion with reference: %s" % reference) context.logEntry("Attempt to do a new ingestion with reference: %s" % reference)
# remove supplier and eof from reference # remove supplier and eof from reference
data_ingestion_reference = '/'.join(reference.split('/')[1:-1]) data_ingestion_reference = '/'.join(reference.split('/')[1:-3])
EOF = reference.split('/')[-1] EOF = reference.split('/')[-3]
if data_ingestion_reference is "": if data_ingestion_reference is "":
context.logEntry("[ERROR] Data Ingestion reference is not well formated") context.logEntry("[ERROR] Data Ingestion reference is not well formated")
......
...@@ -24,6 +24,7 @@ class TestDataIngestion(SecurityTestCase): ...@@ -24,6 +24,7 @@ class TestDataIngestion(SecurityTestCase):
TSV = "/tsv" TSV = "/tsv"
GZ = "/gz" GZ = "/gz"
NII = "/nii" NII = "/nii"
SIZE_HASH = "/fake-size/fake-hash"
RANDOM = "/" + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)]) RANDOM = "/" + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)])
CHUNK_SIZE_TXT = 50000 CHUNK_SIZE_TXT = 50000
CHUNK_SIZE_CSV = 25 CHUNK_SIZE_CSV = 25
...@@ -81,7 +82,7 @@ class TestDataIngestion(SecurityTestCase): ...@@ -81,7 +82,7 @@ class TestDataIngestion(SecurityTestCase):
request = self.portal.REQUEST request = self.portal.REQUEST
request.method = requestMethod request.method = requestMethod
request.auth = authentication request.auth = authentication
request.set('reference', reference + eof) request.set('reference', reference + eof + self.SIZE_HASH)
request.set('data_chunk', encoded_data_chunk) request.set('data_chunk', encoded_data_chunk)
ingestion.ingest() ingestion.ingest()
self.tic() self.tic()
......
...@@ -46,15 +46,15 @@ ...@@ -46,15 +46,15 @@
<key> <string>text_content_warning_message</string> </key> <key> <string>text_content_warning_message</string> </key>
<value> <value>
<tuple> <tuple>
<string>W:138, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:139, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:161, 34: Unused variable \'i\' (unused-variable)</string> <string>W:162, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:161, 76: Unused variable \'j\' (unused-variable)</string> <string>W:162, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:184, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string> <string>W:185, 4: Redefining name \'np\' from outer scope (line 9) (redefined-outer-name)</string>
<string>W:184, 4: Reimport \'numpy\' (imported line 9) (reimported)</string> <string>W:185, 4: Reimport \'numpy\' (imported line 9) (reimported)</string>
<string>W:200, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string> <string>W:201, 11: Using type() instead of isinstance() for a typecheck. (unidiomatic-typecheck)</string>
<string>W:204, 10: No exception type(s) specified (bare-except)</string> <string>W:205, 10: No exception type(s) specified (bare-except)</string>
<string>W:212, 26: Unused variable \'e\' (unused-variable)</string> <string>W:213, 26: Unused variable \'e\' (unused-variable)</string>
<string>W:277, 4: Unused variable \'ingestion_id\' (unused-variable)</string> <string>W:278, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string> <string>W: 8, 0: Unused timedelta imported from datetime (unused-import)</string>
<string>W: 10, 0: Unused import math (unused-import)</string> <string>W: 10, 0: Unused import math (unused-import)</string>
<string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string> <string>W: 13, 0: Unused log imported from Products.ERP5Type.Log (unused-import)</string>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment