Commit ca2254ee authored by Roque Porchetto's avatar Roque Porchetto

Unit tests for ingestion

parent 8fbd9152
from Products.ERP5Type.tests.SecurityTestCase import SecurityTestCase
import string
import random
import time
from DateTime import DateTime
import numpy as np
import math
class TestDataIngestion(SecurityTestCase):
PART = ".PART"
EOF = ".EOF"
def getTitle(self):
return "DataIngestionTest"
def getBusinessTemplateList(self):
return ('erp5_base', 'erp5_web', 'erp5_ingestion_mysql_innodb_catalog', 'erp5_ingestion', 'erp5_dms',
'erp5_wendelin', 'erp5_callables', 'erp5_core')
def afterSetUp(self):
self.context = self.portal.UnitTest_getContext()
def getRandomReference(self):
random_string = ''.join([random.choice(string.ascii_letters + string.digits) for _ in xrange(10)])
return 'UNIT_TEST-' + random_string
def chunks(self, l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
def generateRawDataBytesAndArray(self):
array = np.random.randn(5, 1000)
raw_data = self.context.generateRawData(array)
return raw_data, array
def getIngestionPolicy(self, reference, ingestion_script):
ingestion_policy = self.portal.portal_ingestion_policies.newContent( \
id = reference,
portal_type ='Ingestion Policy',
reference = reference,
version = '001',
script_id = ingestion_script)
ingestion_policy.validate()
self.tic()
return ingestion_policy
def ingestRequest(self, requestMethod, authentication , reference, eof, data_chunk, ingestion):
request = self.portal.REQUEST
request.method = requestMethod
request.auth = authentication
request.set('reference', reference + eof)
request.set('data_chunk', data_chunk)
ingestion.ingest()
self.tic()
return
def getDataStream(self, reference):
data_stream = self.portal.portal_catalog.getResultValue(
portal_type = 'Data Stream',
reference = reference)
return data_stream
def getDataArray(self, reference):
data_array = self.portal.portal_catalog.getResultValue(
portal_type = 'Data Array',
reference = "Transform Raw FIF Data-" + reference)
return data_array
def manuallyStopIngestionWorkaround(self, reference):
now = DateTime().strftime('%Y%m%d')
data_ingestion_id = "%s-%s" %(now, reference)
url = 'data_ingestion_module/' + data_ingestion_id
data_ingestion = self.context.restrictedTraverse(url)
if data_ingestion.getSimulationState() == "started":
data_ingestion.stop()
self.tic()
def simulateIngestionAlarm(self, reference):
self.portal.ERP5Site_stopIngestionList()
self.tic()
self.manuallyStopIngestionWorkaround(reference)
self.portal.ERP5Site_createDataAnalysisList()
time.sleep(5)
self.tic()
self.portal.ERP5Site_executeDataAnalysisList()
time.sleep(5)
self.tic()
def test_full_data_ingestion(self):
reference = self.getRandomReference()
ingestion_policy = self.getIngestionPolicy(reference, 'HandleFifEmbulkIngestion')
data_chunk, nparray = self.generateRawDataBytesAndArray()
self.ingestRequest('POST', ('zope', 'roque5'), reference, self.EOF, data_chunk, ingestion_policy)
data_stream = self.getDataStream(reference)
self.assertEqual(len(data_chunk), len(data_stream.getData()))
self.assertEqual(data_chunk, data_stream.getData())
self.simulateIngestionAlarm(reference)
data_array = self.getDataArray(reference)
self.assertTrue(np.allclose(nparray, data_array.getArray()))
def test_data_ingestion_splitted_file(self):
reference = self.getRandomReference()
ingestion_policy = self.getIngestionPolicy(reference, 'HandleFifEmbulkIngestion')
data_chunk, nparray = self.generateRawDataBytesAndArray()
data_chunk_1 = data_chunk[:int(math.floor(len(data_chunk)/2))]
data_chunk_2 = data_chunk[int(math.floor(len(data_chunk)/2)):]
self.ingestRequest('POST', ('zope', 'roque5'), reference, self.PART, data_chunk_1, ingestion_policy)
data_stream = self.getDataStream(reference)
self.ingestRequest('POST', ('zope', 'roque5'), reference, self.EOF, data_chunk_2, ingestion_policy)
self.assertEqual(len(data_chunk), len(data_stream.getData()))
self.assertEqual(data_chunk, data_stream.getData())
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testDataIngestion</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.wendelin.testDataIngestion</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:111, 16: Unused variable \'nparray\' (unused-variable)</string>
</tuple>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
test.erp5.wendelin.testDataIngestion
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment