Commit 3c85c817 authored by Roque Porchetto's avatar Roque Porchetto

erp5_wendelin_telecom_ingestion: unit tests for different type of files

parent a49edabc
import mne
from mne.report import Report
import os
import json
import numpy as np
from lxml.html import parse
from Products.ERP5Type.Log import log
def getMNEReportJSON(file_name):
try:
pattern = file_name + "_raw.fif"
report_file = file_name + 'report.html'
os.rename(file_name, pattern)
report = Report(verbose=True)
report.parse_folder(data_path="./", pattern=[pattern])
report.save(report_file, overwrite=True, open_browser=False)
data = {}
doc = parse(report_file)
results = doc.xpath("//table[@class = 'table table-hover']")
rows = iter(results[0])
for row in rows:
data[row[0].text] = row[1].text
json_data = json.dumps(data)
return json_data
except Exception as e:
log("Error while getting JSON Report: " + str(e))
return ""
finally:
if os.path.exists(pattern):
os.rename(pattern, file_name)
if os.path.exists(report_file):
os.remove(report_file)
def generateRawData(sample_data_stream):
log("-TEST- Getting raw content from sample data stream...")
content = sample_data_stream.getData()
......@@ -15,6 +43,8 @@ def generateRawData(sample_data_stream):
log("-TEST- Error getting raw data from file. Please check that sample_data_stream content corresponds to a valid fif raw file.")
raise e
json_data = getMNEReportJSON(file_name)
log("-TEST- Generating array from raw content...")
picks = mne.pick_types(raw.info)
data, times = raw[picks]
......@@ -26,4 +56,4 @@ def generateRawData(sample_data_stream):
if os.path.exists(file_name):
os.remove(file_name)
log("-TEST- Done.")
return content, array
return content, array, json_data
......@@ -52,7 +52,7 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W: 20, 8: Unused variable \'times\' (unused-variable)</string>
<string>W: 50, 8: Unused variable \'times\' (unused-variable)</string>
</tuple>
</value>
</item>
......
......@@ -2,10 +2,14 @@ from Products.ERP5Type.tests.SecurityTestCase import SecurityTestCase
import string
import random
import time
import json
import csv
import os
from datetime import datetime, timedelta
import numpy as np
import math
import base64
from Products.ZSQLCatalog.SQLCatalog import Query
from Products.ERP5Type.Log import log
class TestDataIngestion(SecurityTestCase):
......@@ -13,6 +17,12 @@ class TestDataIngestion(SecurityTestCase):
PART = "/xxx"
EOF = "/EOF"
FIF = "/fif"
TXT = "/txt"
CSV = "/csv"
TSV = "/tsv"
RANDOM = "/" + ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(3)])
CHUNK_SIZE_TXT = 50000
CHUNK_SIZE_CSV = 25
REF_PREFIX = "fake-supplier/fake-dataset/"
INGESTION_SCRIPT = 'HandleFifEmbulkIngestion'
USER = 'zope'
......@@ -32,8 +42,8 @@ class TestDataIngestion(SecurityTestCase):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
return 'UNIT-TEST-' + random_string
def getIngestionReference(self, reference):
return self.REF_PREFIX + reference + self.FIF
def getIngestionReference(self, reference, extension):
return self.REF_PREFIX + reference + extension
def sanitizeReference(self, reference):
return reference.replace("/", "_"), '/'.join(reference.split('/')[1:])
......@@ -45,8 +55,8 @@ class TestDataIngestion(SecurityTestCase):
def generateRawDataBytesAndArray(self):
url = 'data_stream_module/mne_sample_for_test'
sample_data_stream = self.context.restrictedTraverse(url)
raw_data, array = self.context.generateRawData(sample_data_stream)
return raw_data, array
raw_data, array, json_data = self.context.generateRawData(sample_data_stream)
return raw_data, array, json_data
def getIngestionPolicy(self, reference, ingestion_script):
ingestion_policy = self.portal.portal_ingestion_policies.newContent( \
......@@ -82,6 +92,13 @@ class TestDataIngestion(SecurityTestCase):
reference = reference)
return data_array
def getDataDescriptor(self, reference):
query = Query(portal_type="Data Descriptor")
for document in self.portal.portal_catalog(query=query, sort_on=(('id', 'DESC', 'int'),)):
if document.reference == reference:
return document
return None
def manuallyStopIngestionWorkaround(self, reference, now_time):
# TODO: replace the while with an aproach similar to:
# https://lab.nexedi.com/nexedi/erp5/blob/master/bt5/erp5_scalability_test/SkinTemplateItem/portal_skins/erp5_scalability_test/ERP5Site_getScalabilityTestMetric.py#L6
......@@ -117,34 +134,96 @@ class TestDataIngestion(SecurityTestCase):
time.sleep(5)
self.tic()
def test_full_data_ingestion(self):
reference = self.getRandomReference()
def ingest(self, data_chunk, reference, extension):
ingestion_policy = self.getIngestionPolicy(reference, self.INGESTION_SCRIPT)
data_chunk, nparray = self.generateRawDataBytesAndArray()
ingestion_reference = self.getIngestionReference(reference)
ingestion_reference = self.getIngestionReference(reference, extension)
now = datetime.now()
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk, ingestion_policy)
ingestion_id, ingestion_reference = self.sanitizeReference(ingestion_reference)
self.simulateIngestionAlarm(ingestion_id, now)
return ingestion_reference
def checkDataObjects(self, ingestion_reference, data_chunk, array, json_data):
data_stream = self.getDataStream(ingestion_reference)
self.assertEqual(len(data_chunk), len(data_stream.getData()))
self.assertEqual(data_chunk, data_stream.getData())
self.simulateIngestionAlarm(ingestion_id, now)
data_array = self.getDataArray(ingestion_reference)
if array is None:
self.assertEqual(array, data_array.getArray())
else:
np.testing.assert_allclose(array, data_array.getArray()[:])
self.assertTrue(np.allclose(array, data_array.getArray()[:]))
data_descriptor = self.getDataDescriptor(ingestion_reference)
self.assertEqual(json_data, data_descriptor.getTextContent())
def perform_csv_test(self, extension, delimiter):
file_name = "file_name.csv"
reference = self.getRandomReference()
array = [[random.random() for i in range(self.CHUNK_SIZE_CSV + 10)] for j in range(self.CHUNK_SIZE_CSV + 10)]
np.savetxt(file_name, array, delimiter=delimiter)
chunk = []
with open(file_name, 'r') as csv_file:
data_chunk = csv_file.read()
csv_file.seek(0)
reader = csv.reader(csv_file, delimiter=delimiter)
for index, line in enumerate(reader):
if (index < self.CHUNK_SIZE_CSV):
chunk.append(line)
else:
break
data = {}
data["csv"] = chunk
json_data = json.dumps(data)
ingestion_reference = self.ingest(data_chunk, reference, extension)
self.checkDataObjects(ingestion_reference, data_chunk, array[:self.CHUNK_SIZE_CSV], json_data)
if os.path.exists(file_name):
os.remove(file_name)
np.testing.assert_allclose(nparray, data_array.getArray()[:])
self.assertTrue(np.allclose(nparray, data_array.getArray()[:]))
def test_full_data_ingestion(self):
reference = self.getRandomReference()
data_chunk, nparray, json_data = self.generateRawDataBytesAndArray()
ingestion_reference = self.ingest(data_chunk, reference, self.FIF)
self.checkDataObjects(ingestion_reference, data_chunk, nparray, json_data)
def test_text_data_ingestion(self):
reference = self.getRandomReference()
data_chunk = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(self.CHUNK_SIZE_TXT + 1000)])
ingestion_reference = self.ingest(data_chunk, reference, self.TXT)
json_data = json.dumps({"File content sample: ": data_chunk[:self.CHUNK_SIZE_TXT]})
self.checkDataObjects(ingestion_reference, data_chunk, None, json_data)
def test_tsv_data_ingestion(self):
delimiter = "\t"
extension = self.TSV
self.perform_csv_test(extension, delimiter)
def test_csv_data_ingestion(self):
delimiter = ","
extension = self.CSV
self.perform_csv_test(extension, delimiter)
def test_default_text_data_ingestion(self):
reference = self.getRandomReference()
data_chunk = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(self.CHUNK_SIZE_TXT + 1000)])
ingestion_reference = self.ingest(data_chunk, reference, self.RANDOM)
json_data = json.dumps({"File content sample: ": data_chunk[:self.CHUNK_SIZE_TXT]})
self.checkDataObjects(ingestion_reference, data_chunk, None, json_data)
def test_data_ingestion_splitted_file(self):
reference = self.getRandomReference()
ingestion_policy = self.getIngestionPolicy(reference, self.INGESTION_SCRIPT)
data_chunk, nparray = self.generateRawDataBytesAndArray()
data_chunk = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(1000)])
data_chunk_1 = data_chunk[:int(math.floor(len(data_chunk)/2))]
data_chunk_2 = data_chunk[int(math.floor(len(data_chunk)/2)):]
ingestion_reference = self.getIngestionReference(reference)
ingestion_reference = self.getIngestionReference(reference, self.FIF)
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.PART, data_chunk_1, ingestion_policy)
self.ingestRequest('POST', (self.USER, self.PASS), ingestion_reference, self.EOF, data_chunk_2, ingestion_policy)
......
......@@ -46,8 +46,9 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:151, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
<string>W:144, 16: Unused variable \'nparray\' (unused-variable)</string>
<string>W:166, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:166, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:230, 4: Unused variable \'ingestion_id\' (unused-variable)</string>
</tuple>
</value>
</item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment