Commit d0bb606d authored by Jérome Perrin's avatar Jérome Perrin

software/slapos-master/test: port the tests to python3

parent b833c90e
......@@ -8,12 +8,11 @@ import shutil
import subprocess
import tempfile
import time
import urllib
import urlparse
from BaseHTTPServer import BaseHTTPRequestHandler
import urllib.parse
from http.server import BaseHTTPRequestHandler
from typing import Dict
import mock
from unittest import mock
import OpenSSL.SSL
import pexpect
import psutil
......@@ -44,10 +43,10 @@ class EchoHTTPServer(ManagedHTTPServer):
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
'Incoming Headers': dict(self.headers.items()),
},
indent=2,
)
).encode('utf-8')
self.end_headers()
self.wfile.write(response)
......@@ -67,11 +66,11 @@ class EchoHTTP11Server(ManagedHTTPServer):
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
'Incoming Headers': dict(self.headers.items()),
},
indent=2,
)
self.send_header("Content-Length", len(response))
).encode('utf-8')
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
......@@ -100,7 +99,7 @@ class CaucaseService(ManagedResource):
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = '%s:%s' % (self._cls._ipv4_address, findFreeTCPPort(self._cls._ipv4_address))
backend_caucased_netloc = f'{self._cls._ipv4_address}:{findFreeTCPPort(self._cls._ipv4_address)}'
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
......@@ -110,6 +109,7 @@ class CaucaseService(ManagedResource):
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
# capture subprocess output not to pollute test's own stdout
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
......@@ -127,6 +127,7 @@ class CaucaseService(ManagedResource):
# type: () -> None
self._caucased_process.terminate()
self._caucased_process.wait()
self._caucased_process.stdout.close()
shutil.rmtree(self.directory)
......@@ -166,10 +167,11 @@ class BalancerTestCase(ERP5InstanceTestCase):
'backend-path-dict': {
'default': '',
},
'ssl-authentication-dict': {},
'ssl-authentication-dict': {'default': False},
'ssl': {
'caucase-url': cls.getManagedResource("caucase", CaucaseService).url,
},
'timeout-dict': {'default': None},
'family-path-routing-dict': {},
'path-routing-list': [],
}
......@@ -185,18 +187,27 @@ class BalancerTestCase(ERP5InstanceTestCase):
class SlowHTTPServer(ManagedHTTPServer):
"""An HTTP Server which reply after 2 seconds.
"""An HTTP Server which reply after a timeout.
Timeout is 2 seconds by default, and can be specified in the path of the URL
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "text/plain")
time.sleep(2)
timeout = 2
try:
timeout = int(self.path[1:])
except ValueError:
pass
time.sleep(timeout)
self.end_headers()
self.wfile.write("OK\n")
self.wfile.write(b"OK\n")
log_message = logging.getLogger(__name__ + '.SlowHTTPServer').info
log_message = logging.getLogger(__name__ + '.SlowHandler').info
class TestLog(BalancerTestCase, CrontabMixin):
......@@ -206,7 +217,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestLog, cls)._getInstanceParameterDict()
parameter_dict = super()._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
return parameter_dict
......@@ -214,7 +225,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
def test_access_log_format(self):
# type: () -> None
requests.get(
urlparse.urljoin(self.default_balancer_url, '/url_path'),
urllib.parse.urljoin(self.default_balancer_url, '/url_path'),
verify=False,
)
time.sleep(.5) # wait a bit more until access is logged
......@@ -254,7 +265,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
'apachedex',
'ApacheDex-*.html',
))
with open(apachedex_report, 'r') as f:
with open(apachedex_report) as f:
report_text = f.read()
self.assertIn('APacheDEX', report_text)
# having this table means that apachedex could parse some lines.
......@@ -301,7 +312,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
error_line = error_log_file.read().splitlines()[-1]
self.assertIn('apache.conf -D FOREGROUND', error_line)
# this log also include a timestamp
# This regex is for haproxy mostly, so keep it commented for now, until we can
# This regex is for haproxy mostly, so keep it commented for now, until we can
# Merge the slapos-master setup and erp5.
# self.assertRegexpMatches(error_line, r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')
......@@ -331,7 +342,7 @@ class BalancerCookieHTTPServer(ManagedHTTPServer):
# The name of this cookie is SERVERID
assert self.headers['X-Balancer-Current-Cookie'] == 'SERVERID'
self.end_headers()
self.wfile.write(server._name)
self.wfile.write(server._name.encode('utf-8'))
log_message = logging.getLogger(__name__ + '.BalancerCookieHTTPServer').info
return RequestHandler
......@@ -344,7 +355,7 @@ class TestBalancer(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestBalancer, cls)._getInstanceParameterDict()
parameter_dict = super()._getInstanceParameterDict()
# use two backend servers
parameter_dict['dummy_http_server'] = [
......@@ -373,7 +384,7 @@ class TestBalancer(BalancerTestCase):
# if backend provides a "SERVERID" cookie, balancer will overwrite it with the
# backend selected by balancing algorithm
self.assertIn(
requests.get(urlparse.urljoin(self.default_balancer_url, '/set_cookie'), verify=False).cookies['SERVERID'],
requests.get(urllib.parse.urljoin(self.default_balancer_url, '/set_cookie'), verify=False).cookies['SERVERID'],
('default-0', 'default-1'),
)
......@@ -400,10 +411,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(
TestTestRunnerEntryPoints,
cls,
)._getInstanceParameterDict()
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server-test-runner-address-list'] = [
[
......@@ -427,18 +435,18 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
)['default-test-runner-url-list']
url_0, url_1, url_2 = test_runner_url_list
self.assertEqual(
urlparse.urlparse(url_0).netloc,
urlparse.urlparse(url_1).netloc)
urllib.parse.urlparse(url_0).netloc,
urllib.parse.urlparse(url_1).netloc)
self.assertEqual(
urlparse.urlparse(url_0).netloc,
urlparse.urlparse(url_2).netloc)
urllib.parse.urlparse(url_0).netloc,
urllib.parse.urlparse(url_2).netloc)
path_0 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_0/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
netloc=urllib.parse.urlparse(url_0).netloc)
path_1 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_1/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
netloc=urllib.parse.urlparse(url_0).netloc)
path_2 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_2/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
netloc=urllib.parse.urlparse(url_0).netloc)
self.assertEqual(
{
......@@ -476,7 +484,7 @@ class TestHTTP(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestHTTP, cls)._getInstanceParameterDict()
parameter_dict = super()._getInstanceParameterDict()
# use a HTTP/1.1 server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("HTTP/1.1 Server", EchoHTTP11Server).netloc, 1, False]]
return parameter_dict
......@@ -497,32 +505,33 @@ class TestHTTP(BalancerTestCase):
'%{http_version}',
self.default_balancer_url,
]),
'1.1',
b'1.1',
)
def test_keep_alive(self):
# type: () -> None
# when doing two requests, connection is established only once
session = requests.Session()
session.verify = False
with requests.Session() as session:
session.verify = False
# do a first request, which establish a first connection
session.get(self.default_balancer_url).raise_for_status()
# "break" new connection method and check we can make another request
with mock.patch(
"requests.packages.urllib3.connectionpool.HTTPSConnectionPool._new_conn",
) as new_conn:
# do a first request, which establish a first connection
session.get(self.default_balancer_url).raise_for_status()
new_conn.assert_not_called()
parsed_url = urlparse.urlparse(self.default_balancer_url)
# check that we have an open file for the ip connection
self.assertTrue([
c for c in psutil.Process(os.getpid()).connections()
if c.status == 'ESTABLISHED' and c.raddr.ip == parsed_url.hostname
and c.raddr.port == parsed_url.port
])
# "break" new connection method and check we can make another request
with mock.patch(
"requests.packages.urllib3.connectionpool.HTTPSConnectionPool._new_conn",
) as new_conn:
session.get(self.default_balancer_url).raise_for_status()
new_conn.assert_not_called()
parsed_url = urllib.parse.urlparse(self.default_balancer_url)
# check that we have an open file for the ip connection
self.assertTrue([
c for c in psutil.Process(os.getpid()).connections()
if c.status == 'ESTABLISHED' and c.raddr.ip == parsed_url.hostname
and c.raddr.port == parsed_url.port
])
class ContentTypeHTTPServer(ManagedHTTPServer):
......@@ -539,12 +548,12 @@ class ContentTypeHTTPServer(ManagedHTTPServer):
# type: () -> None
self.send_response(200)
if self.path == '/':
self.send_header("Content-Length", 0)
self.send_header("Content-Length", '0')
return self.end_headers()
content_type = self.path[1:]
body = "OK"
body = b"OK"
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", len(body))
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
......@@ -558,7 +567,7 @@ class TestContentEncoding(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestContentEncoding, cls)._getInstanceParameterDict()
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False],
]
......@@ -588,19 +597,19 @@ class TestContentEncoding(BalancerTestCase):
'application/x-font-opentype',
'application/wasm',):
resp = requests.get(
urlparse.urljoin(self.default_balancer_url, content_type),
urllib.parse.urljoin(self.default_balancer_url, content_type),
verify=False,
headers={"Accept-Encoding": "gzip, deflate",})
self.assertEqual(resp.headers['Content-Type'], content_type)
self.assertEqual(
resp.headers.get('Content-Encoding'),
'gzip',
'%s uses wrong encoding: %s' % (content_type, resp.headers.get('Content-Encoding')))
'{} uses wrong encoding: {}'.format(content_type, resp.headers.get('Content-Encoding')))
self.assertEqual(resp.text, 'OK')
def test_no_gzip_encoding(self):
# type: () -> None
resp = requests.get(urlparse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
resp = requests.get(urllib.parse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
self.assertNotIn('Content-Encoding', resp.headers)
self.assertEqual(resp.text, 'OK')
......@@ -683,7 +692,7 @@ class CaucaseCertificate(ManagedResource):
cas_args + [
'--send-csr', self.csr_file,
],
).split()[0]
).split()[0].decode()
assert csr_id
for _ in range(30):
......@@ -699,8 +708,8 @@ class CaucaseCertificate(ManagedResource):
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as f:
assert 'BEGIN CERTIFICATE' in f.read()
with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase):
# type: (str, CaucaseService) -> None
......@@ -724,8 +733,8 @@ class TestServerTLSProvidedCertificate(BalancerTestCase):
# type: () -> Dict
server_caucase = cls.getManagedResource('server_caucase', CaucaseService)
server_certificate = cls.getManagedResource('server_certificate', CaucaseCertificate)
server_certificate.request(cls._ipv4_address.decode(), server_caucase)
parameter_dict = super(TestServerTLSProvidedCertificate, cls)._getInstanceParameterDict()
server_certificate.request(cls._ipv4_address, server_caucase)
parameter_dict = super()._getInstanceParameterDict()
with open(server_certificate.cert_file) as f:
parameter_dict['ssl']['cert'] = f.read()
with open(server_certificate.key_file) as f:
......
......@@ -28,11 +28,11 @@
import os
import json
import glob
import urlparse
import urllib.parse
import socket
import time
import re
import BaseHTTPServer
import http.server
import multiprocessing
import subprocess
......@@ -44,7 +44,7 @@ from . import setUpModule
setUpModule # pyflakes
class TestPublishedURLIsReachableMixin(object):
class TestPublishedURLIsReachableMixin:
"""Mixin that checks that default page of ERP5 is reachable.
"""
......@@ -52,7 +52,7 @@ class TestPublishedURLIsReachableMixin(object):
# We access ERP5 trough a "virtual host", which should make
# ERP5 produce URLs using https://virtual-host-name:1234/virtual_host_root
# as base.
virtual_host_url = urlparse.urljoin(
virtual_host_url = urllib.parse.urljoin(
base_url,
'/VirtualHostBase/https/virtual-host-name:1234/{}/VirtualHostRoot/_vh_virtual_host_root/'
.format(site_id))
......@@ -72,20 +72,20 @@ class TestPublishedURLIsReachableMixin(object):
total=60,
backoff_factor=.5,
status_forcelist=(404, 500, 503))))
r = session.get(virtual_host_url, verify=verify, allow_redirects=False)
self.assertEqual(r.status_code, requests.codes.found)
# access on / are redirected to login form, with virtual host preserved
self.assertEqual(r.headers.get('location'), 'https://virtual-host-name:1234/virtual_host_root/login_form')
# login page can be rendered and contain the text "ERP5"
r = session.get(
urlparse.urljoin(base_url, '{}/login_form'.format(site_id)),
verify=verify,
allow_redirects=False,
)
self.assertEqual(r.status_code, requests.codes.ok)
self.assertIn("ERP5", r.text)
with session:
r = session.get(virtual_host_url, verify=verify, allow_redirects=False)
self.assertEqual(r.status_code, requests.codes.found)
# access on / are redirected to login form, with virtual host preserved
self.assertEqual(r.headers.get('location'), 'https://virtual-host-name:1234/virtual_host_root/login_form')
# login page can be rendered and contain the text "ERP5"
r = session.get(
urllib.parse.urljoin(base_url, f'{site_id}/login_form'),
verify=verify,
allow_redirects=False,
)
self.assertEqual(r.status_code, requests.codes.ok)
self.assertIn("ERP5", r.text)
def test_published_family_default_v6_is_reachable(self):
"""Tests the IPv6 URL published by the root partition is reachable.
......@@ -134,7 +134,7 @@ class TestJupyter(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
param_dict = self.getRootPartitionConnectionParameterDict()
self.assertEqual(
'https://[%s]:8888/tree' % self._ipv6_address,
f'https://[{self._ipv6_address}]:8888/tree',
param_dict['jupyter-url']
)
......@@ -172,7 +172,7 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase):
}
def checkValidHTTPSURL(self, url):
parsed = urlparse.urlparse(url)
parsed = urllib.parse.urlparse(url)
self.assertEqual(parsed.scheme, 'https')
self.assertTrue(parsed.hostname)
self.assertTrue(parsed.port)
......@@ -182,16 +182,16 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase):
param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'):
self.checkValidHTTPSURL(
param_dict['family-{family_name}'.format(family_name=family_name)])
param_dict[f'family-{family_name}'])
self.checkValidHTTPSURL(
param_dict['family-{family_name}-v6'.format(family_name=family_name)])
param_dict[f'family-{family_name}-v6'])
def test_published_test_runner_url(self):
# each family's also a list of test test runner URLs, by default 3 per family
param_dict = self.getRootPartitionConnectionParameterDict()
for family_name in ('family1', 'family2'):
family_test_runner_url_list = param_dict[
'{family_name}-test-runner-url-list'.format(family_name=family_name)]
f'{family_name}-test-runner-url-list']
self.assertEqual(3, len(family_test_runner_url_list))
for url in family_test_runner_url_list:
self.checkValidHTTPSURL(url)
......@@ -209,23 +209,23 @@ class TestApacheBalancerPorts(ERP5InstanceTestCase):
# normal access on ipv4 and ipv6 and test runner access on ipv4 only
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = [p for p in all_process_info if p['name'] == 'apache']
process_info, = (p for p in all_process_info if p['name'] == 'apache')
apache_process = psutil.Process(process_info['pid'])
self.assertEqual(
sorted([socket.AF_INET] * 4 + [socket.AF_INET6] * 2),
sorted([
sorted(
c.family
for c in apache_process.connections()
if c.status == 'LISTEN'
]))
))
def test_haproxy_listen(self):
# There is one haproxy per family
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
process_info, = [
process_info, = (
p for p in all_process_info if p['name'].startswith('haproxy-')
]
)
haproxy_process = psutil.Process(process_info['pid'])
self.assertEqual([socket.AF_INET, socket.AF_INET], [
c.family for c in haproxy_process.connections() if c.status == 'LISTEN'
......@@ -290,8 +290,8 @@ class TestZopeNodeParameterOverride(ERP5InstanceTestCase, TestPublishedURLIsReac
zodb["pool-timeout"] = "10m"
storage["storage"] = "root"
storage["server"] = zeo_addr
with open('%s/etc/zope-%s.conf' % (partition, zope)) as f:
conf = map(str.strip, f.readlines())
with open(f'{partition}/etc/zope-{zope}.conf') as f:
conf = list(map(str.strip, f.readlines()))
i = conf.index("<zodb_db root>") + 1
conf = iter(conf[i:conf.index("</zodb_db>", i)])
for line in conf:
......@@ -300,23 +300,23 @@ class TestZopeNodeParameterOverride(ERP5InstanceTestCase, TestPublishedURLIsReac
if line == '</zeoclient>':
break
checkParameter(line, storage)
for k, v in storage.iteritems():
for k, v in storage.items():
self.assertIsNone(v, k)
del storage
else:
checkParameter(line, zodb)
for k, v in zodb.iteritems():
for k, v in zodb.items():
self.assertIsNone(v, k)
partition = self.getComputerPartitionPath('zope-a')
for zope in xrange(3):
for zope in range(3):
checkConf({
"cache-size-bytes": "20MB",
}, {
"cache-size": "50MB",
})
partition = self.getComputerPartitionPath('zope-bb')
for zope in xrange(5):
for zope in range(5):
checkConf({
"cache-size-bytes": "500MB" if zope else 1<<20,
}, {
......@@ -332,19 +332,20 @@ def popenCommunicate(command_list, input_=None, **kwargs):
popen.kill()
if popen.returncode != 0:
raise ValueError(
'Issue during calling %r, result was:\n%s' % (command_list, result))
f'Issue during calling {command_list!r}, result was:\n{result}')
return result
class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class TestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
response = {
'Path': self.path,
'Incoming Headers': self.headers.dict
}
response = json.dumps(response, indent=2)
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': {k.lower(): v for k, v in self.headers.items()},
},
indent=2,
).encode('utf-8')
self.end_headers()
self.wfile.write(response)
......@@ -352,7 +353,7 @@ class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class TestDeploymentScriptInstantiation(ERP5InstanceTestCase):
"""This check deployment script like instantiation
Low level assertions are done here in roder to assure that
Low level assertions are done here in order to assure that
https://lab.nexedi.com/nexedi/slapos.package/blob/master/playbook/
slapos-master-standalone.yml
works correctly
......@@ -426,8 +427,8 @@ class TestDeploymentScriptInstantiation(ERP5InstanceTestCase):
1,
len(backend_apache_configuration_list)
)
backend_apache_configuration = open(
backend_apache_configuration_list[0]).read()
with open(backend_apache_configuration_list[0]) as f:
backend_apache_configuration = f.read()
self.assertIn(
'SSLVerifyClient require',
backend_apache_configuration
......@@ -452,7 +453,7 @@ class TestDeploymentScriptInstantiation(ERP5InstanceTestCase):
common_name = 'TEST-SSL-AUTH'
popenCommunicate([
'openssl', 'req', '-utf8', '-nodes', '-config', openssl_config, '-new',
'-keyout', key, '-out', csr, '-days', '3650'], '%s\n' % (common_name,),
'-keyout', key, '-out', csr, '-days', '3650'], f'{common_name}\n'.encode(),
stdin=subprocess.PIPE)
popenCommunicate([
'openssl', 'ca', '-utf8', '-days', '3650', '-batch', '-config',
......@@ -464,24 +465,24 @@ class TestDeploymentScriptInstantiation(ERP5InstanceTestCase):
ip, port = re.search(
r'.*http:\/\/(.*):(\d*)\/.*', portal_slap_line).groups()
port = int(port)
server = BaseHTTPServer.HTTPServer((ip, port), TestHandler)
server = http.server.HTTPServer((ip, port), TestHandler)
server_process = multiprocessing.Process(
target=server.serve_forever, name='HTTPServer')
server_process.start()
try:
# assert that accessing the service endpoint results with certificate
# authentication and proper information extraction
result_json = requests.get(
self.getRootPartitionConnectionParameterDict()['family-service'],
verify=False, cert=(cert, key)).json()
self.assertEqual(
common_name,
result_json['Incoming Headers']['remote-user']
)
self.assertEqual(
'/erp5/portal_slap/',
result_json['Path']
)
finally:
server_process.join(10)
server_process.terminate()
self.addCleanup(server_process.terminate)
self.addCleanup(server_process.join, 10)
server.socket.close()
# assert that accessing the service endpoint results with certificate
# authentication and proper information extraction
result_json = requests.get(
self.getRootPartitionConnectionParameterDict()['family-service'],
verify=False, cert=(cert, key)).json()
self.assertEqual(
common_name,
result_json['Incoming Headers']['remote-user']
)
self.assertEqual(
'/erp5/portal_slap/',
result_json['Path']
)
##############################################################################
# coding: utf-8
#
# Copyright (c) 2018 Nexedi SA and Contributors. All Rights Reserved.
#
......@@ -29,7 +28,7 @@
import os
import json
import glob
import urlparse
import urllib.parse
import socket
import sys
import time
......@@ -38,7 +37,7 @@ import datetime
import subprocess
import gzip
from backports import lzma
import lzma
import MySQLdb
from slapos.testing.utils import CrontabMixin
......@@ -80,7 +79,7 @@ class MariaDBTestCase(ERP5InstanceTestCase):
def getDatabaseConnection(self):
connection_parameter_dict = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])
db_url = urlparse.urlparse(connection_parameter_dict['database-list'][0])
db_url = urllib.parse.urlparse(connection_parameter_dict['database-list'][0])
self.assertEqual('mysql', db_url.scheme)
self.assertTrue(db_url.path.startswith('/'))
......@@ -91,6 +90,8 @@ class MariaDBTestCase(ERP5InstanceTestCase):
host=db_url.hostname,
port=db_url.port,
db=database_name,
use_unicode=True,
charset='utf8mb4'
)
......@@ -106,7 +107,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
'mariadb-full',
'20500101000000.sql.gz',
),
'r') as dump:
'rt') as dump:
self.assertIn('CREATE TABLE', dump.read())
def test_logrotate_and_slow_query_digest(self):
......@@ -148,7 +149,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
'slowquery_digest',
'slowquery_digest.txt-2050-01-01.xz',
)
with lzma.open(slow_query_report, 'r') as f:
with lzma.open(slow_query_report, 'rt') as f:
# this is the hash for our "select sleep(n)" slow query
self.assertIn("ID 0xF9A57DD5A41825CA", f.read())
......@@ -170,7 +171,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
subprocess.check_output('faketime 2050-01-01 %s' % check_slow_query_promise_plugin['command'], shell=True)
self.assertEqual(
error_context.exception.output,
"""\
b"""\
Threshold is lower than expected:
Expected total queries : 1.0 and current is: 2
Expected slowest query : 0.1 and current is: 3
......@@ -220,7 +221,7 @@ class TestMroonga(MariaDBTestCase):
"""
SELECT mroonga_normalize("ABCDあぃうぇ㍑")
""")
self.assertEqual((('abcdあぃうぇリットル',),),
self.assertEqual((('abcdあぃうぇリットル'.encode(),),),
cnx.store_result().fetch_row(maxrows=2))
if 0:
......@@ -233,7 +234,7 @@ class TestMroonga(MariaDBTestCase):
"""
SELECT mroonga_normalize("aBcDあぃウェ㍑", "NormalizerMySQLUnicodeCIExceptKanaCIKanaWithVoicedSoundMark")
""")
self.assertEqual((('ABCDあぃうぇ㍑',),),
self.assertEqual((('ABCDあぃうぇ㍑'.encode(),),),
cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_normalizer(self):
......@@ -321,7 +322,7 @@ class TestMroonga(MariaDBTestCase):
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SELECT mroonga_command('register token_filters/stem')")
self.assertEqual((('true',),), cnx.store_result().fetch_row(maxrows=2))
self.assertEqual(((b'true',),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
CREATE TABLE memos (
......
......@@ -75,5 +75,5 @@ def lookupMount(zurl):
# readfile returns content of file @path.
def readfile(path):
with open(path, 'r') as f:
with open(path) as f:
return f.read()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment