Commit 543fe30a authored by Ekaterina's avatar Ekaterina

change: embulk test location

parent b4c13939
Test for Embulk software release
##############################################################################
#
# Copyright (c) 2019 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from setuptools import setup, find_packages
version = '0.0.1.dev0'
name = 'slapos.test.embulk'
with open("README.md") as f:
long_description = f.read()
setup(
name=name,
version=version,
description="Test for SlapOS' embulk",
long_description=long_description,
long_description_content_type='text/markdown',
maintainer="Nexedi",
maintainer_email="info@nexedi.com",
url="https://lab.nexedi.com/nexedi/slapos",
packages=find_packages(),
install_requires=[
'slapos.core',
'slapos.libnetworkcache',
'erp5.util',
'requests',
],
zip_safe=True,
test_suite='test',
)
Metadata-Version: 2.1
Name: slapos.test.embulk
Version: 0.0.1.dev0
Summary: Test for SlapOS' embulk
Home-page: https://lab.nexedi.com/nexedi/slapos
Maintainer: Nexedi
Maintainer-email: info@nexedi.com
License: UNKNOWN
Description: Test for Embulk software release
Platform: UNKNOWN
Description-Content-Type: text/markdown
README.md
setup.py
slapos.test.embulk.egg-info/PKG-INFO
slapos.test.embulk.egg-info/SOURCES.txt
slapos.test.embulk.egg-info/dependency_links.txt
slapos.test.embulk.egg-info/requires.txt
slapos.test.embulk.egg-info/top_level.txt
slapos.test.embulk.egg-info/zip-safe
\ No newline at end of file
slapos.core
slapos.libnetworkcache
erp5.util
requests
import os
import subprocess
import logging
from logging.handlers import RotatingFileHandler
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'software.cfg')))
class TransferDataTest(SlapOSInstanceTestCase):
def create_embulk_config(self, config, data_in_file_name, data_out_path):
config_string = "exec:\n max_threads: 1\n min_output_tasks: 1\n" + \
"in:\n type: filename\n path_prefix: %s\n parser:\n type: none-bin\n" + \
"out:\n type: file\n path_prefix: %s\n file_ext: wen\n formatter:\n type: single_value"
config_string = config_string % (data_in_file_name, data_out_path)
f = open(config,'w')
print >> f,config_string
f.close()
def test_transfer_data(self):
pwd = os.getcwd()
config = pwd + "/config.yml"
data_in_file_name = pwd + "/data-in.wen"
data_out_path = pwd + "/data_out"
data_in = "It works"
data_in_file = open(data_in_file_name, 'w')
data_in_file.write(data_in)
data_in_file.close()
self.create_embulk_config(config, data_in_file_name, data_out_path)
subprocess.call(['{{ java_location }}/bin/java', '-jar', '{{ embulk_location }}/embulk.jar',
'run', config, '-b', '{{ embulkPlugins_location }}/plugins'])
data_out_path = data_out_path + "000.00.wen"
data_out_file = open(data_out_path, 'rb')
data_out = data_out_file.read()
data_out_file.close()
self.assertEqual(data_in, base64.b64decode(data_out))
os.remove(config)
os.remove(data_in_file_name)
os.remove(data_out_path)
\ No newline at end of file
##############################################################################
#
# Copyright (c) 2019 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
import requests
from StringIO import StringIO
import SimpleHTTPServer
import SocketServer
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import os
import time
import utils
import multiprocessing
test_msg = "dummyInputSimpleIngest"
#url = "http://$${caddy-configuration:local_ip}:8443"
url = "http://10.0.46.242:8443"
posted_data = None
all_data = []
request_tag = ""
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
setUpModule, SlapOSInstanceTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'software.cfg')))
###################################################
class TestServerHandler(BaseHTTPRequestHandler):
def _set_headers(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_GET(self):
self._set_headers()
self.wfile.write("<html><body><h1>hi!</h1></body></html>")
def do_HEAD(self):
self._set_headers()
def do_POST(self):
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
post_data = self.rfile.read(content_length) # <--- Gets the data itself
self._set_headers()
self.wfile.write(post_data)
global posted_data
posted_data = post_data
print("post data from do_POST")
print(posted_data)
global all_data
all_data.append(post_data.split(" ")[1])
global request_tag
request_tag = find_tag(self.requestline,"=", " ")
class TestIngestion(FluentdPluginTestCase):
@classmethod
def startServer(cls):
port=9443
server_address = ('0.0.0.0', port)
server = HTTPServer(server_address, TestServerHandler)
cls.server_process = multiprocessing.Process(target=server.serve_forever)
cls.server_process.start()
print("Server started...")
print("local ipv4 = ")
print(os.environ.get('LOCAL_IPV4'))
@classmethod
def stopServer(cls):
cls.server_process.terminate()
def setUp(self):
self.startServer()
# port=9443
# server_address = ('$${slap-network-information:local-ipv4}', port)
# httpd = HTTPServer(server_address, TestServerHandler)
# thread = threading.Thread(target=httpd.serve_forever)
# thread.start()
# print 'Starting http...'
# time.sleep(15)
print("for fake commit")
def tearDown(self):
self.stopServer()
time.sleep(10)
# httpd.shutdown()
# print("all posted data : ")
# print(all_data)
def test_1_get(self):
print("############## TEST 1 ##############")
resp = requests.get(url)
self.assertEqual(resp.status_code, 200)
print (resp.status_code)
def test_2_ingest(self):
print("############## TEST 2 ##############")
start_fluentd_cat(test_msg, "tag_test_2")
time.sleep(10)
print("posted data from test 2")
print(posted_data)
if posted_data:
self.assertEqual(test_msg, posted_data.split(" ")[1])
else:
self.assertEqual(test_msg, posted_data)
time.sleep(10)
self.assertEqual(test_msg, posted_data.split(" ")[1])
def start_fluentd_cat(test_msg, tag):
os.environ["GEM_PATH"] ="$${fluentd-service:path}/lib/ruby/gems/1.8/"
fluentd_cat_exec_comand = '$${fluentd-service:path}/bin/fluent-cat --none ' + tag
os.system("echo + " + test_msg + " | " + fluentd_cat_exec_comand)
def find_tag(s, start, end):
return (s.split(start))[1].split(end)[0]
'''
def main():
port=9443
server_address = ('$${slap-network-information:local-ipv4}', port)
httpd = HTTPServer(server_address, TestServerHandler)
thread = threading.Thread(target=httpd.serve_forever)
thread.start()
print 'Starting http...'
time.sleep(15)
stream = StringIO()
runner = unittest.TextTestRunner(verbosity=2, stream=stream)
result = runner.run(unittest.makeSuite(TestPost))
print 'Tests run ', result.testsRun
print 'Errors ', result.errors
print "Failures ", result.failures
stream.seek(0)
print 'Test output\n', stream.read()
time.sleep(30)
httpd.shutdown()
print(posted_data)
print("all posted data : ")
print(all_data)
return result.testsRun, result.errors, result.failures, stream.getvalue()
if __name__ == "__main__":
main()
'''
import os
import subprocess
import unittest
import base64
class TransferDataTest(unittest.TestCase):
def create_embulk_config(self, config, data_in_file_name, data_out_path):
config_string = "exec:\n max_threads: 1\n min_output_tasks: 1\n" + \
"in:\n type: filename\n path_prefix: %s\n parser:\n type: none-bin\n" + \
"out:\n type: file\n path_prefix: %s\n file_ext: wen\n formatter:\n type: single_value"
config_string = config_string % (data_in_file_name, data_out_path)
f = open(config,'w')
print >> f,config_string
f.close()
def test_transfer_data(self):
pwd = os.getcwd()
config = pwd + "/config.yml"
data_in_file_name = pwd + "/data-in.wen"
data_out_path = pwd + "/data_out"
data_in = "It works"
data_in_file = open(data_in_file_name, 'w')
data_in_file.write(data_in)
data_in_file.close()
self.create_embulk_config(config, data_in_file_name, data_out_path)
subprocess.call(['/srv/slapgrid/slappart18/srv/runner/software/f5aec1c37c220000ab1c2ef799aac847/parts/java/bin/java',
'-jar', '/srv/slapgrid/slappart18/srv/runner/software/f5aec1c37c220000ab1c2ef799aac847/parts/embulk/embulk.jar',
'run', config, '-b', '/srv/slapgrid/slappart18/srv/runner/software/f5aec1c37c220000ab1c2ef799aac847/parts/embulkPlugins/plugins'])
data_out_path = data_out_path + "000.00.wen"
data_out_file = open(data_out_path, 'rb')
data_out = data_out_file.read()
data_out_file.close()
self.assertEqual(data_in, base64.b64decode(data_out))
os.remove(config)
os.remove(data_in_file_name)
os.remove(data_out_path)
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TransferDataTest)
unittest.TextTestRunner(verbosity=3).run(suite)
......@@ -152,6 +152,11 @@ setup = ${slapos-repository:location}/software/dream/test/
egg = slapos.test.repman
setup = ${slapos-repository:location}/software/repman/test/
[slapos.test.embulk-setup]
<= setup-develop-egg
egg = slapos.test.embulk
setup = ${slapos-repository:location}/software/embulk/test/
[slapos.core-repository]
<= git-clone-repository
repository = https://lab.nexedi.com/nexedi/slapos.core.git
......@@ -264,6 +269,7 @@ extra =
${slapos.test.cloudooo-setup:setup}
${slapos.test.dream-setup:setup}
${slapos.test.repman-setup:setup}
${slapos.test.embulk-setup:setup}
[versions]
# slapos.core is used from the clone always
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment