Commit 70111cf7 authored by Jérome Perrin's avatar Jérome Perrin

software/erp5/test: test zope behavior

  - access log
    - log rotation
    - username is logged (expected failure in WSGI)
  - event log
    - log rotation
  - long requests log (expected failure in WSGI)
    - log rotation
  - activity processing
  - deadlock debugger
parent bdea823b
......@@ -27,19 +27,26 @@
from __future__ import absolute_import
import contextlib
import glob
import json
import os
import shutil
import socket
import ssl
import subprocess
import sys
import tempfile
import time
import unittest
import psutil
import requests
import six
import six.moves.urllib.parse
import six.moves.xmlrpc_client
import urllib3
from slapos.testing.utils import CrontabMixin
from . import ERP5InstanceTestCase, setUpModule
......@@ -434,3 +441,400 @@ class TestWatchActivities(ERP5InstanceTestCase):
except subprocess.CalledProcessError as e:
self.fail(e.output)
self.assertIn(' dict ', output)
class ZopeTestMixin(CrontabMixin):
"""Mixin class for zope features.
"""
wsgi = NotImplemented # type: bool
__partition_reference__ = 'z'
@classmethod
def getInstanceParameterDict(cls):
return {
'_':
json.dumps({
"zope-partition-dict": {
"default": {
"longrequest-logger-interval": 1,
"longrequest-logger-timeout": 1,
},
},
"wsgi": cls.wsgi,
})
}
@classmethod
def _setUpClass(cls):
super(ZopeTestMixin, cls)._setUpClass()
param_dict = cls.getRootPartitionConnectionParameterDict()
# rebuild an url with user and password
parsed = six.moves.urllib.parse.urlparse(param_dict['family-default'])
cls.zope_base_url = parsed._replace(
netloc='{}:{}@{}:{}'.format(
param_dict['inituser-login'],
param_dict['inituser-password'],
parsed.hostname,
parsed.port,
),
path=param_dict['site-id'] + '/',
).geturl()
cls.zope_deadlock_debugger_url = six.moves.urllib_parse.urljoin(
cls.zope_base_url,
'/manage_debug_threads?{deadlock-debugger-password}'.format(
**param_dict),
)
@contextlib.contextmanager
def getXMLRPCClient():
# don't verify certificate
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
erp5_xmlrpc_client = six.moves.xmlrpc_client.ServerProxy(
cls.zope_base_url,
context=ssl_context,
)
# BBB use as a context manager only on python3
if sys.version_info < (3, ):
yield erp5_xmlrpc_client
else:
with erp5_xmlrpc_client:
yield erp5_xmlrpc_client
with getXMLRPCClient() as erp5_xmlrpc_client:
# wait for ERP5 to be ready (TODO: this should probably be a promise)
for _ in range(120):
time.sleep(1)
try:
erp5_xmlrpc_client.getTitle()
except (six.moves.xmlrpc_client.ProtocolError,
six.moves.xmlrpc_client.Fault):
pass
else:
break
def addPythonScript(script_id, params, body):
with getXMLRPCClient() as erp5_xmlrpc_client:
custom = erp5_xmlrpc_client.portal_skins.custom
try:
custom.manage_addProduct.PythonScripts.manage_addPythonScript(
script_id)
except six.moves.xmlrpc_client.ProtocolError as e:
if e.errcode != 302:
raise
getattr(custom, script_id).ZPythonScriptHTML_editAction(
'',
'',
params,
body,
)
# a python script to verify activity processing
addPythonScript(
script_id='ERP5Site_verifyActivityProcessing',
params='mode',
body='''if 1:
import json
portal = context.getPortalObject()
if mode == "count":
return json.dumps(dict(count=len(portal.portal_activities.getMessageList())))
if mode == "activate":
for _ in range(10):
portal.portal_templates.activate(activity="SQLQueue").getTitle()
return "activated"
raise ValueError("Unknown mode: %s" % mode)
''',
)
cls.zope_verify_activity_processing_url = six.moves.urllib_parse.urljoin(
cls.zope_base_url,
'ERP5Site_verifyActivityProcessing',
)
# a python script logging to event log
addPythonScript(
script_id='ERP5Site_logMessage',
params='name',
body='''if 1:
from erp5.component.module.Log import log
return log("hello %s" % name)
''',
)
cls.zope_log_message_url = six.moves.urllib_parse.urljoin(
cls.zope_base_url,
'ERP5Site_logMessage',
)
# a python script issuing a long request
addPythonScript(
script_id='ERP5Site_executeLongRequest',
params='',
body='''if 1:
import time
for _ in range(5):
time.sleep(1)
return "done"
''',
)
cls.zope_long_request_url = six.moves.urllib_parse.urljoin(
cls.zope_base_url,
'ERP5Site_executeLongRequest',
)
def setUp(self):
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
def tearDown(self):
# reset logrotate status
logrotate_status = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'logrotate.status',
)
if os.path.exists(logrotate_status):
os.unlink(logrotate_status)
for logfile in glob.glob(
os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'*',
)):
os.unlink(logfile)
for logfile in glob.glob(
os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'monitor',
'private',
'documents',
'*',
)):
os.unlink(logfile)
def _getCrontabCommand(self, crontab_name):
# type: (str) -> str
"""Read a crontab and return the command that is executed.
overloaded to use crontab from zope partition
"""
with open(
os.path.join(
self.getComputerPartitionPath('zope-default'),
'etc',
'cron.d',
crontab_name,
)) as f:
crontab_spec, = f.readlines()
self.assertNotEqual(crontab_spec[0], '@', crontab_spec)
return crontab_spec.split(None, 5)[-1]
def test_event_log_rotation(self):
requests.get(
self.zope_log_message_url,
params={
"name": "world"
},
verify=False,
).raise_for_status()
zope_event_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'zope-0-event.log',
)
with open(zope_event_log_path) as f:
self.assertIn('hello world', f.read())
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'zope-0-event.log-20500101',
)
with open(rotated_log_file) as f:
self.assertIn('hello world', f.read())
requests.get(
self.zope_log_message_url,
params={
"name": "le monde"
},
verify=False,
).raise_for_status()
with open(zope_event_log_path) as f:
self.assertNotIn('hello world', f.read())
with open(zope_event_log_path) as f:
self.assertIn('hello le monde', f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_access_log_rotation(self):
requests.get(
self.zope_base_url,
verify=False,
headers={
'User-Agent': 'before rotation'
},
).raise_for_status()
zope_access_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'zope-0-Z2.log',
)
with open(zope_access_log_path) as f:
self.assertIn('before rotation', f.read())
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'zope-0-Z2.log-20500101',
)
with open(rotated_log_file) as f:
self.assertIn('before rotation', f.read())
requests.get(
self.zope_base_url,
verify=False,
headers={
'User-Agent': 'after rotation'
},
).raise_for_status()
with open(zope_access_log_path) as f:
self.assertNotIn('before rotation', f.read())
with open(zope_access_log_path) as f:
self.assertIn('after rotation', f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_long_request_log_rotation(self):
requests.get(self.zope_long_request_url,
verify=False,
params={
'when': 'before rotation'
}).raise_for_status()
zope_long_request_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'longrequest_logger_zope-0.log',
)
with open(zope_long_request_log_path) as f:
self.assertIn('before rotation', f.read())
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.getComputerPartitionPath('zope-default'),
'srv',
'backup',
'logrotate',
'longrequest_logger_zope-0.log-20500101',
)
with open(rotated_log_file) as f:
self.assertIn('before rotation', f.read())
requests.get(
self.zope_long_request_url,
verify=False,
params={
'when': 'after rotation'
},
).raise_for_status()
with open(zope_long_request_log_path) as f:
self.assertNotIn('before rotation', f.read())
with open(zope_long_request_log_path) as f:
self.assertIn('after rotation', f.read())
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_basic_authentication_user_in_access_log(self):
param_dict = self.getRootPartitionConnectionParameterDict()
requests.get(self.zope_base_url,
verify=False,
auth=requests.auth.HTTPBasicAuth(
param_dict['inituser-login'],
param_dict['inituser-password'],
)).raise_for_status()
zope_access_log_path = os.path.join(
self.getComputerPartitionPath('zope-default'),
'var',
'log',
'zope-0-Z2.log',
)
with open(zope_access_log_path) as f:
self.assertIn(param_dict['inituser-login'], f.read())
def test_deadlock_debugger(self):
dump_response = requests.get(
self.zope_deadlock_debugger_url,
verify=False,
)
dump_response.raise_for_status()
self.assertIn('Thread ', dump_response.text)
def test_activity_processing(self):
requests.get(
self.zope_verify_activity_processing_url,
params={
'mode': 'activate'
},
verify=False,
).raise_for_status()
for retry in range(60):
time.sleep(10)
count = requests.get(
self.zope_verify_activity_processing_url,
params={
'mode': 'count',
'retry': retry,
},
verify=False,
).json()['count']
if not count:
break
else:
self.assertEqual(count, 0)
class TestZopeMedusa(ZopeTestMixin, ERP5InstanceTestCase):
wsgi = False
class TestZopeWSGI(ZopeTestMixin, ERP5InstanceTestCase):
wsgi = True
@unittest.expectedFailure
def test_long_request_log_rotation(self):
super(TestZopeWSGI, self).test_long_request_log_rotation(self)
@unittest.expectedFailure
def test_basic_authentication_user_in_access_log(self):
super(TestZopeWSGI, self).test_basic_authentication_user_in_access_log(self)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment