Commit f2d3826d authored by Julien Muchembled's avatar Julien Muchembled Committed by Julien Muchembled

IngestionPolicy: accept data chunk as application/octet-stream

For backward compatibility with ingestion policy scripts, the message body is
turned into a 'data_chunk' request parameter.

By not having anymore to convert to/from application/x-www-form-urlencoded,
this fixes a severe performance issue, in particular when ingesting a lot of
data with fluentd (Ruby uses regex to convert).

/reviewed-on nexedi/wendelin!28
parent d2867d16
...@@ -53,13 +53,24 @@ class IngestionPolicy(Folder): ...@@ -53,13 +53,24 @@ class IngestionPolicy(Folder):
return self.portal_ingestion_policies.unpack(data) return self.portal_ingestion_policies.unpack(data)
security.declarePublic('ingest') security.declarePublic('ingest')
def ingest(self, **kw): def ingest(self, REQUEST, **kw):
""" """
Ingest chunk of raw data either from a Sensor or any of DAUs. Ingest chunk of raw data either from a Sensor or any of DAUs.
""" """
if self.REQUEST.method != 'POST': environ = REQUEST.environ
raise BadRequest('Only POST request is allowed.') method = environ.pop('REQUEST_METHOD')
try:
if method != 'POST':
raise BadRequest('Only POST request is allowed.')
if REQUEST._file is not None:
assert not REQUEST.form, REQUEST.form # Are cgi and HTTPRequest fixed ?
# Query string was ignored so parse again, faking a GET request.
# Such POST is legit: https://stackoverflow.com/a/14710450
REQUEST.processInputs()
REQUEST.form['data_chunk'] = REQUEST._file.read()
finally:
environ['REQUEST_METHOD'] = method
script_id = self.getScriptId() script_id = self.getScriptId()
if script_id is not None: if script_id is not None:
script = getattr(self, script_id, None) script = getattr(self, script_id, None)
......
...@@ -30,10 +30,13 @@ from Products.ERP5Type.tests.utils import createZODBPythonScript ...@@ -30,10 +30,13 @@ from Products.ERP5Type.tests.utils import createZODBPythonScript
from wendelin.bigarray.array_zodb import ZBigArray from wendelin.bigarray.array_zodb import ZBigArray
from DateTime import DateTime from DateTime import DateTime
from zExceptions import NotFound from zExceptions import NotFound
from cStringIO import StringIO
import httplib
import msgpack import msgpack
import numpy as np import numpy as np
import string import string
import random import random
import urllib
def getRandomString(): def getRandomString():
return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \ return 'test_%s' %''.join([random.choice(string.ascii_letters + string.digits) \
...@@ -84,7 +87,7 @@ class Test(ERP5TypeTestCase): ...@@ -84,7 +87,7 @@ class Test(ERP5TypeTestCase):
import pandas as _ import pandas as _
import matplotlib as _ import matplotlib as _
def test_01_IngestionFromFluentd(self): def test_01_IngestionFromFluentd(self, old_fluentd=False):
""" """
Test ingestion using a POST Request containing a msgpack encoded message Test ingestion using a POST Request containing a msgpack encoded message
simulating input from fluentd. simulating input from fluentd.
...@@ -103,12 +106,17 @@ class Test(ERP5TypeTestCase): ...@@ -103,12 +106,17 @@ class Test(ERP5TypeTestCase):
ingestion_policy, _, data_stream, data_array = \ ingestion_policy, _, data_stream, data_array = \
self.stepSetupIngestion(reference) self.stepSetupIngestion(reference)
# simulate fluentd by setting proper values in REQUEST # simulate fluentd
request.method = 'POST' body = msgpack.packb([0, real_data], use_bin_type=True)
data_chunk = msgpack.packb([0, real_data], use_bin_type=True) if old_fluentd:
request.set('reference', reference) env = {'CONTENT_TYPE': 'application/x-www-form-urlencoded'}
request.set('data_chunk', data_chunk) body = urllib.urlencode({'data_chunk': body})
ingestion_policy.ingest() else:
env = {'CONTENT_TYPE': 'application/octet-stream'}
response = self.publish(
ingestion_policy.getPath() + '/ingest?reference=' + reference,
env=env, request_method='POST', stdin=StringIO(body))
self.assertEqual(httplib.NO_CONTENT, response.getStatus())
data_stream_data = data_stream.getData() data_stream_data = data_stream.getData()
self.assertEqual(real_data, data_stream_data) self.assertEqual(real_data, data_stream_data)
...@@ -129,9 +137,11 @@ class Test(ERP5TypeTestCase): ...@@ -129,9 +137,11 @@ class Test(ERP5TypeTestCase):
# test ingesting with bad reference and raise of NotFound # test ingesting with bad reference and raise of NotFound
request.set('reference', reference + 'not_existing') request.set('reference', reference + 'not_existing')
self.assertRaises(NotFound, ingestion_policy.ingest) self.assertRaises(NotFound, ingestion_policy.ingest)
def test_01_1_IngestionFromOldFluentd(self):
self.test_01_IngestionFromFluentd(True)
def test_01_2_IngestionTail(self):
def test_01_1_IngestionTail(self):
""" """
Test real time convertion to a numpy array by appending data to a data stream. Test real time convertion to a numpy array by appending data to a data stream.
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment