Commit 5dd67af1 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

software/fluentd: test Wendelin Learning Track

Add tests for Wendelin Learning Track tutorial
https://wendelin.nexedi.com/wendelin-Learning.Track
parent a40cdd3e
...@@ -41,6 +41,7 @@ setup(name=name, ...@@ -41,6 +41,7 @@ setup(name=name,
url="https://lab.nexedi.com/nexedi/slapos", url="https://lab.nexedi.com/nexedi/slapos",
packages=find_packages(), packages=find_packages(),
install_requires=[ install_requires=[
'msgpack',
'slapos.core', 'slapos.core',
'slapos.libnetworkcache', 'slapos.libnetworkcache',
'erp5.util', 'erp5.util',
......
...@@ -25,10 +25,25 @@ ...@@ -25,10 +25,25 @@
# #
############################################################################## ##############################################################################
import msgpack
import os import os
import random
import shutil
import socket
import struct
import subprocess
import tempfile
import time
import six import six
from six.moves.SimpleHTTPServer import SimpleHTTPRequestHandler
from six.moves.socketserver import StreamRequestHandler, TCPServer
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
from slapos.testing.utils import findFreeTCPPort
FLUENTD_PORT = 24224
FLUSH_INTERVAL = 1
setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass( setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
...@@ -50,3 +65,215 @@ class FluentdTestCase(SlapOSInstanceTestCase): ...@@ -50,3 +65,215 @@ class FluentdTestCase(SlapOSInstanceTestCase):
for expected_process_name in expected_process_name_list: for expected_process_name in expected_process_name_list:
self.assertIn(expected_process_name, process_names) self.assertIn(expected_process_name, process_names)
class OneRequestServer(TCPServer):
address_family = socket.AF_INET6
timeout = 1
def get_first_data(self, flush_interval=1):
start = time.time()
while(not self.RequestHandlerClass.received_data
and time.time() - start < 10*flush_interval):
self.handle_request()
return self.RequestHandlerClass.received_data
class WendelinTutorialTestCase(FluentdTestCase):
@classmethod
def get_configuration(cls):
return ''
@classmethod
def getInstanceParameterDict(cls):
return {'conf_text': cls._conf,}
@classmethod
def measureDict(cls):
return {k: v.encode() for k, v in
zip((b'pressure', b'humidity', b'temperature'), cls._measurementList)}
@classmethod
def setUpClass(cls):
fluentd_dir = os.path.join(cls.computer_partition_root_path,
'software_release', 'parts', 'fluentd')
cls._fluentd_bin = os.path.join(fluentd_dir, 'bin', 'fluentd')
cls._gem_path = os.path.join(fluentd_dir, 'lib', 'ruby', 'gems', '1.8')
cls._tmp_dir = tempfile.mkdtemp()
cls._measurementList = cls.sensor_value_list()
cls._conf = cls.get_configuration()
super(FluentdTestCase, cls).setUpClass()
@classmethod
def sensor_value_list(cls):
return [str(value) for value in (round(random.uniform(870, 1084), 2),
round(random.uniform(0, 100), 2),
round(random.uniform(-20, 50), 3))]
def serve(self, port, request_handler_class):
server_address = (self._ipv6_address, port)
server = OneRequestServer(server_address, request_handler_class)
data = server.get_first_data(FLUSH_INTERVAL)
server.server_close()
return data
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls._tmp_dir)
super(FluentdTestCase, cls).tearDownClass()
def read_fluentd_conf(self, configuration):
conf_path = os.path.join(self._tmp_dir, 'fluentd.conf')
with open(conf_path, "w") as conf_file:
conf_file.write(configuration)
return subprocess.check_output(
[self._fluentd_bin, '-c', conf_path, '--dry-run'],
env={'GEM_PATH': self._gem_path},
universal_newlines=True,
)
def _test_configuration(self, expected_str):
self.assertRegexpMatches(
self.read_fluentd_conf(self._conf),
expected_str,
)
class FluentdHTTPRequestHandler(StreamRequestHandler):
received_data = b''
def handle(self):
data = self.rfile.readline().strip()
# ignore heartbeats (https://docs.fluentd.org/output/forward#heartbeat_type)
if len(data) > 0:
FluentdHTTPRequestHandler.received_data = data
# see https://wendelin.nexedi.com/wendelin-Learning.Track/wendelin-Tutorial.Setup.Fluentd.on.Sensor
class SensorConfTestCase(WendelinTutorialTestCase):
@classmethod
def get_configuration(cls):
script_path = os.path.join(cls._tmp_dir, "custom_read_bme280.py")
with open(script_path, "w") as script:
script.write(cls.sensor_script(cls._measurementList))
return cls.sensor_conf(script_path)
@classmethod
def sensor_conf(cls, script_path):
return '''\
<source>
@type exec
tag tag.name
command python %s
run_interval %ss
<parse>
keys pressure, humidity, temperature
</parse>
</source>
<match tag.name>
@type forward
<server>
name myserver1
host %s
</server>
<buffer>
flush_mode immediate
</buffer>
</match>''' % (script_path, FLUSH_INTERVAL, cls._ipv6_address)
@classmethod
def sensor_script(cls, measurementList):
return '''\
#!/usr/bin/python
# -*- coding: utf-8 -*-
print("%s")''' % "\t".join(measurementList)
def test_configuration(self):
self._test_configuration(
r'adding forwarding server \'myserver1\' host="%s" port=%s weight=60'
% (self._ipv6_address, FLUENTD_PORT)
)
def test_send_data(self):
tag, data, header = msgpack.unpackb(
self.serve(FLUENTD_PORT, FluentdHTTPRequestHandler),
raw=True,
)
self.assertEqual(b'tag.name', tag)
self.assertEqual(self.measureDict(), msgpack.unpackb(data)[-1])
self.assertEqual({b'compressed': b'text', b'size': 1}, header)
class WendelinHTTPRequestHandler(SimpleHTTPRequestHandler):
received_data = b''
def do_POST(self):
WendelinHTTPRequestHandler.received_data = self.rfile.read(
int(self.headers['Content-Length']))
self.send_response(200)
self.end_headers()
# see https://wendelin.nexedi.com/wendelin-Learning.Track/wendelin-Tutorial.Setup.Fluentd.on.IOTGateway
class GatewayConfTestCase(WendelinTutorialTestCase):
@classmethod
def gateway_conf(cls, fluentd_port, wendelin_port):
return '''\
<source>
@type forward
port %s
bind %s
</source>
<match tag.name>
@type wendelin
streamtool_uri http://[%s]:%s/erp5/portal_ingestion_policies/default
user foo
password bar
buffer_type file
buffer_path fluentd-buffer-file/
flush_interval %ss
</match>''' % (fluentd_port, cls._ipv6_address, cls._ipv6_address,
wendelin_port, FLUSH_INTERVAL)
@classmethod
def get_configuration(cls):
fluentd_port = findFreeTCPPort(cls._ipv6_address)
cls._fluentd_port = fluentd_port
wendelin_port = findFreeTCPPort(cls._ipv6_address)
cls._wendelin_port = wendelin_port
return cls.gateway_conf(fluentd_port, wendelin_port)
def test_configuration_file(self):
self._test_configuration('starting fluentd')
def test_wendelin_data_forwarding(self):
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.connect((self._ipv6_address, self._fluentd_port))
data = [
msgpack.ExtType(0, struct.pack('!Q', int(time.time()) << 32)),
self.measureDict(),
]
sock.sendall(
msgpack.packb([
b'tag.name',
msgpack.packb(data),
{b'size': 1, b'compressed': b'text'},
], use_bin_type=False),
)
sock.close()
self.assertEqual(
data,
msgpack.unpackb(
self.serve(self._wendelin_port, WendelinHTTPRequestHandler)),
)
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment