Commit 7c31c744 authored by Jérome Perrin's avatar Jérome Perrin

LongRequestLogger from zope svn which should support mutiple thread according...

LongRequestLogger from zope svn which should support mutiple thread according to Leonardo + hack to log mysql query
parent db55efe9
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
##############################################################################
def initialize(context):
from Products.LongRequestLogger.dumper import do_enable
from Products.LongRequestLogger.patch import do_patch
if do_enable():
# if not enabled on startup, it won't be enabled, period.
# it can be disabled at runtime afterwards, but will not be unpatched.
do_patch()
##############################################################################
#
# Copyright (c) 2010,2012 Zope Foundation and Contributors.
#
##############################################################################
from cStringIO import StringIO
from pprint import pformat
from thread import get_ident
import Signals.Signals
import ZConfig.components.logger.loghandler
import ZServer.BaseLogger
import logging
import os
import os.path
import time
import traceback
import sys
try:
from signal import SIGUSR2
except ImportError:
# Windows doesn't have these (but also doesn't care what the exact
# numbers are)
SIGUSR2 = 12
try:
sys._current_frames
except AttributeError:
# Python 2.4 and older
import threadframe
sys._current_frames = threadframe.dict
class NullHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
# for comparison purposes below
self.baseFilename = 'null'
def emit(self, *args, **kw):
pass
# we might want to change this name later to something more specific
logger_name = __name__
log = logging.getLogger(logger_name)
log.propagate = False
handler = NullHandler()
log.addHandler(handler)
formatter = logging.Formatter("%(asctime)s - %(message)s")
DEFAULT_TIMEOUT = 2
DEFAULT_INTERVAL = 1
def do_enable():
global handler
# this function is not exactly thread-safe, but it shouldn't matter.
# The worse that can happen is that a change in longrequestlogger_file
# will stop or change the logging destination of an already running request
logfile = os.environ.get('longrequestlogger_file')
if not logfile:
return None # so that Dumpers know they are disabled
if logfile != 'null':
# to imitate FileHandler
logfile = os.path.abspath(logfile)
rotate = None
if handler.baseFilename != logfile:
log.removeHandler(handler)
handler.close()
if logfile == 'null':
handler = NullHandler()
elif os.name == 'nt':
rotate = Signals.Signals.LogfileRotateHandler
handler = ZConfig.components.logger.loghandler.Win32FileHandler(
logfile)
else:
rotate = Signals.Signals.LogfileReopenHandler
handler = ZConfig.components.logger.loghandler.FileHandler(
logfile)
handler.formatter = formatter
log.addHandler(handler)
# Register with Zope 2 signal handlers to support log rotation
if rotate and Signals.Signals.SignalHandler:
Signals.Signals.SignalHandler.registerHandler(
SIGUSR2, rotate([handler]))
return log # which is also True as boolean
def get_configuration():
return dict(
timeout=float(os.environ.get('longrequestlogger_timeout',
DEFAULT_TIMEOUT)),
interval=float(os.environ.get('longrequestlogger_interval',
DEFAULT_INTERVAL)),
)
THREAD_FORMAT = "Thread %s: Started on %.1f; Running for %.1f secs; %s"
REQUEST_FORMAT = """
request: %(method)s %(url)s
retry count: %(retries)s
form: %(form)s
other: %(other)s
""".strip()
class Dumper(object):
def __init__(self, thread_id=None):
if thread_id is None:
# assume we're being called by the thread that wants to be
# monitored
thread_id = get_ident()
self.thread_id = thread_id
self.start = time.time()
# capture it here in case it gets disabled in the future
self.log = do_enable()
conf = get_configuration()
self.timeout = conf['timeout']
self.interval = conf['interval']
def is_enabled(self):
return bool(self.log)
def format_request(self, request):
if request is None:
return "[No request]"
url = request.getURL()
if request.get('QUERY_STRING'):
url += '?' + request['QUERY_STRING']
retries = request.retry_count
method = request['REQUEST_METHOD']
form = pformat(request.form)
other = pformat(request.other)
return REQUEST_FORMAT % locals()
def extract_request(self, frame):
# We try to fetch the request from the 'call_object' function because
# it's the one that gets called with retried requests.
# And we import it locally to get even monkey-patched versions of the
# function.
from ZPublisher.Publish import call_object
func_code = call_object.func_code #@UndefinedVariable
while frame is not None:
code = frame.f_code
if (code is func_code):
request = frame.f_locals.get('request')
return request
frame = frame.f_back
def extract_request_info(self, frame):
request = self.extract_request(frame)
return self.format_request(request)
def get_thread_info(self, frame):
request_info = self.extract_request_info(frame)
now = time.time()
runtime = now - self.start
info = THREAD_FORMAT % (self.thread_id,
self.start,
runtime,
request_info)
return info
def format_thread(self):
frame = sys._current_frames()[self.thread_id]
output = StringIO()
thread_info = self.get_thread_info(frame)
print >> output, thread_info
print >> output, "Traceback:"
traceback.print_stack(frame, file=output)
try:
from Products.ZMySQLDA.db import DB
while frame is not None:
code = frame.f_code
if code is DB._query.func_code:
print >> output, "SQL Query:"
print >> output, frame.f_locals['query']
frame = frame.f_back
except ImportError:
pass
del frame
return output.getvalue()
def __call__(self):
self.log.warning(self.format_thread())
##############################################################################
#
# Copyright (c) 2010,2012 Zope Foundation and Contributors.
#
##############################################################################
from cStringIO import StringIO
from pprint import pformat
from thread import get_ident
import Signals.Signals
import ZConfig.components.logger.loghandler
import ZServer.BaseLogger
import logging
import os
import os.path
import time
import traceback
import sys
try:
from signal import SIGUSR2
except ImportError:
# Windows doesn't have these (but also doesn't care what the exact
# numbers are)
SIGUSR2 = 12
try:
sys._current_frames
except AttributeError:
# Python 2.4 and older
import threadframe
sys._current_frames = threadframe.dict
class NullHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
# for comparison purposes below
self.baseFilename = 'null'
def emit(self, *args, **kw):
pass
# we might want to change this name later to something more specific
logger_name = __name__
log = logging.getLogger(logger_name)
log.propagate = False
handler = NullHandler()
log.addHandler(handler)
formatter = logging.Formatter("%(asctime)s - %(message)s")
DEFAULT_TIMEOUT = 2
DEFAULT_INTERVAL = 1
def do_enable():
global handler
# this function is not exactly thread-safe, but it shouldn't matter.
# The worse that can happen is that a change in longrequestlogger_file
# will stop or change the logging destination of an already running request
logfile = os.environ.get('longrequestlogger_file')
if not logfile:
return None # so that Dumpers know they are disabled
if logfile != 'null':
# to imitate FileHandler
logfile = os.path.abspath(logfile)
rotate = None
if handler.baseFilename != logfile:
log.removeHandler(handler)
handler.close()
if logfile == 'null':
handler = NullHandler()
elif os.name == 'nt':
rotate = Signals.Signals.LogfileRotateHandler
handler = ZConfig.components.logger.loghandler.Win32FileHandler(
logfile)
else:
rotate = Signals.Signals.LogfileReopenHandler
handler = ZConfig.components.logger.loghandler.FileHandler(
logfile)
handler.formatter = formatter
log.addHandler(handler)
# Register with Zope 2 signal handlers to support log rotation
if rotate and Signals.Signals.SignalHandler:
Signals.Signals.SignalHandler.registerHandler(
SIGUSR2, rotate([handler]))
return log # which is also True as boolean
def get_configuration():
return dict(
timeout=float(os.environ.get('longrequestlogger_timeout',
DEFAULT_TIMEOUT)),
interval=float(os.environ.get('longrequestlogger_interval',
DEFAULT_INTERVAL)),
)
THREAD_FORMAT = "Thread %s: Started on %.1f; Running for %.1f secs; %s"
REQUEST_FORMAT = """
request: %(method)s %(url)s
retry count: %(retries)s
form: %(form)s
other: %(other)s
""".strip()
class Dumper(object):
def __init__(self, thread_id=None):
if thread_id is None:
# assume we're being called by the thread that wants to be
# monitored
thread_id = get_ident()
self.thread_id = thread_id
self.start = time.time()
# capture it here in case it gets disabled in the future
self.log = do_enable()
conf = get_configuration()
self.timeout = conf['timeout']
self.interval = conf['interval']
def is_enabled(self):
return bool(self.log)
def format_request(self, request):
if request is None:
return "[No request]"
url = request.getURL()
if request.get('QUERY_STRING'):
url += '?' + request['QUERY_STRING']
retries = request.retry_count
method = request['REQUEST_METHOD']
form = pformat(request.form)
other = pformat(request.other)
return REQUEST_FORMAT % locals()
def extract_request(self, frame):
# We try to fetch the request from the 'call_object' function because
# it's the one that gets called with retried requests.
# And we import it locally to get even monkey-patched versions of the
# function.
from ZPublisher.Publish import call_object
func_code = call_object.func_code #@UndefinedVariable
while frame is not None:
code = frame.f_code
if (code is func_code):
request = frame.f_locals.get('request')
return request
frame = frame.f_back
def extract_request_info(self, frame):
request = self.extract_request(frame)
return self.format_request(request)
def get_thread_info(self, frame):
request_info = self.extract_request_info(frame)
now = time.time()
runtime = now - self.start
info = THREAD_FORMAT % (self.thread_id,
self.start,
runtime,
request_info)
return info
def format_thread(self):
frame = sys._current_frames()[self.thread_id]
output = StringIO()
thread_info = self.get_thread_info(frame)
print >> output, thread_info
print >> output, "Traceback:"
traceback.print_stack(frame, file=output)
del frame
return output.getvalue()
def __call__(self):
self.log.warning(self.format_thread())
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
##############################################################################
import os
from select import select
from threading import Thread
from Products.LongRequestLogger.dumper import Dumper
class Monitor(Thread):
"""Logs the stack-trace of a thread until it's stopped
m = Monitor(dumper)
Wait dumper.timeout seconds before calling dumper() every
dumper.interval seconds.
m.stop()
Stop the monitoring, whether timed-out or not
"""
dumper = None
def __init__(self, dumper=None):
Thread.__init__(self)
if dumper is None:
dumper = Dumper()
if dumper.is_enabled():
self._event_pipe = os.pipe()
self.dumper = dumper
self.start()
def waiting(self, timeout):
r, _ = self._event_pipe
read_ready_list, _, _ = select([r], [], [], timeout)
if read_ready_list:
os.read(r, 1)
# stop() called by monitored thread.
# Stop waiting:
return False
# Still waiting for the other thread to finish
return True
def stop(self):
"""Stop monitoring the other thread"""
if self.dumper is not None:
r, w = self._event_pipe
os.write(w, '\0')
self.join()
map(os.close, self._event_pipe)
def run(self):
timeout = self.dumper.timeout
while self.waiting(timeout):
self.dumper()
timeout = self.dumper.interval
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
##############################################################################
import sys
from logging import getLogger
from Products.LongRequestLogger.monitor import Monitor
log = getLogger(__name__)
def wrapper(*args, **kw):
monitor = Monitor()
try:
result = wrapper.original(*args, **kw)
return result
finally:
monitor.stop()
def do_patch():
from ZPublisher.Publish import publish_module_standard as original
wrapper.original = original
log.info('patching %s.%s' % (wrapper.original.__module__,
wrapper.original.__name__))
setattr(sys.modules[wrapper.original.__module__],
wrapper.original.__name__,
wrapper)
def do_unpatch():
setattr(sys.modules[wrapper.original.__module__],
wrapper.original.__name__,
wrapper.original)
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
##############################################################################
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
##############################################################################
import time
class Sleeper(object):
"""This class exists solely to inflate the stack trace, and to be in a
file where the stack trace won't be affected by editing of the test file
that uses it.
"""
def __init__(self, interval):
self.interval = interval
def sleep(self):
self._sleep1()
def _sleep1(self):
self._sleep2()
def _sleep2(self):
time.sleep(self.interval)
class App(object):
def __call__(self, interval):
Sleeper(interval).sleep()
return "OK"
# Enable this module to be published with ZPublisher.Publish.publish_module()
bobo_application = App()
##############################################################################
#
# Copyright (c) 2010 Zope Foundation and Contributors.
#
##############################################################################
import sys
import unittest
from cStringIO import StringIO
from doctest import OutputChecker
from doctest import REPORT_UDIFF, NORMALIZE_WHITESPACE, ELLIPSIS
from Products.LongRequestLogger.tests.common import Sleeper
import os
class SimpleOutputChecker(OutputChecker):
# for certain inputs the doctest output checker is much more convenient
# than manually munging assertions, but we don't want to go through all
# the trouble of actually writing doctests.
optionflags = REPORT_UDIFF | NORMALIZE_WHITESPACE | ELLIPSIS
def __init__(self, want, optionflags=None):
self.want = want
if optionflags is not None:
self.optionflags = optionflags
def __call__(self, got):
assert self.check_output(self.want, got, self.optionflags), \
self.output_difference(self, # pretend we're a doctest.Example
got, self.optionflags)
check_dump = SimpleOutputChecker('''
Thread ...: Started on ...; Running for 0.0 secs; [No request]
<BLANKLINE>
Traceback:
...
File ".../LongRequestLogger/dumper.py", line ..., in format_thread
traceback.print_stack(frame, file=output)
''')
check_log = SimpleOutputChecker('''
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 0.0 secs; [No request]
Traceback:
...
File ".../LongRequestLogger/dumper.py", line ..., in __call__
self.log.warning(self.format_thread())
File ".../LongRequestLogger/dumper.py", line ..., in format_thread
traceback.print_stack(frame, file=output)
''')
check_monitor_log = SimpleOutputChecker('''
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 2.0 secs; [No request]
Traceback:
...
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
''')
check_monitor_2_intervals_log = SimpleOutputChecker('''
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 2.0 secs; [No request]
Traceback:
...
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 3.0 secs; [No request]
Traceback:
...
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 4.0 secs; [No request]
Traceback:
...
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
''')
check_publishing_1_interval_log = SimpleOutputChecker('''
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 2.0 secs; request: GET http://localhost
retry count: 0
form: {}
other: {'ACTUAL_URL': 'http://localhost',
'PARENTS': [],
'PUBLISHED': <Products.LongRequestLogger.tests.common.App object at 0x...>,
'RESPONSE': HTTPResponse(''),
'SERVER_URL': 'http://localhost',
'TraversalRequestNameStack': [],
'URL': 'http://localhost',
'interval': 3.5,
'method': 'GET'}
Traceback:
...
File ".../LongRequestLogger/patch.py", line ..., in wrapper
result = wrapper.original(*args, **kw)
File ".../ZPublisher/Publish.py", line ..., in publish_module_standard
response = publish(request, module_name, after_list, debug=debug)
...
File ".../LongRequestLogger/tests/common.py", line ..., in __call__
Sleeper(interval).sleep()
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 3.0 secs; request: GET http://localhost
retry count: 0
form: {}
other: {'ACTUAL_URL': 'http://localhost',
'PARENTS': [],
'PUBLISHED': <Products.LongRequestLogger.tests.common.App object at 0x...>,
'RESPONSE': HTTPResponse(''),
'SERVER_URL': 'http://localhost',
'TraversalRequestNameStack': [],
'URL': 'http://localhost',
'interval': 3.5,
'method': 'GET'}
Traceback:
...
File ".../LongRequestLogger/patch.py", line ..., in wrapper
result = wrapper.original(*args, **kw)
File ".../ZPublisher/Publish.py", line ..., in publish_module_standard
response = publish(request, module_name, after_list, debug=debug)
...
File ".../LongRequestLogger/tests/common.py", line ..., in __call__
Sleeper(interval).sleep()
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
''')
check_request_formating = SimpleOutputChecker('''
request: GET http://localhost/foo/bar
retry count: 0
form: {}
other: {'RESPONSE': HTTPResponse(''),
'SERVER_URL': 'http://localhost',
'URL': 'http://localhost/foo/bar',
'method': 'GET'}
''')
check_monitor_environment_log = SimpleOutputChecker('''
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 3.5 secs; [No request]
Traceback:
...
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
Products.LongRequestLogger.dumper WARNING
Thread ...: Started on ...; Running for 5.5 secs; [No request]
Traceback:
...
File ".../LongRequestLogger/tests/common.py", line ..., in sleep
self._sleep1()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep1
self._sleep2()
File ".../LongRequestLogger/tests/common.py", line ..., in _sleep2
time.sleep(self.interval)
''')
config_env_variables = dict(
longrequestlogger_file='null',
longrequestlogger_timeout=None,
longrequestlogger_interval=None,
)
class TestLongRequestLogger(unittest.TestCase):
def setUp(self):
from Products.LongRequestLogger.patch import do_patch
from Products.LongRequestLogger.dumper import logger_name
from zope.testing.loggingsupport import InstalledHandler
self.setTestEnvironment()
do_patch()
self.loghandler = InstalledHandler(logger_name)
self.requests = []
def tearDown(self):
from Products.LongRequestLogger.patch import do_unpatch
do_unpatch()
self.restoreTestEnvironment()
self.loghandler.uninstall()
for request in self.requests:
request.response.stdout.close()
request.clear()
def setTestEnvironment(self):
self.old_env = {}
for var, value in config_env_variables.items():
self.old_env[var] = os.environ.pop(var, None)
if value:
os.environ[var] = value
def restoreTestEnvironment(self):
for var, value in self.old_env.items():
os.environ.pop(var, None)
if value is not None:
os.environ[var] = value
def makeRequest(self, path='/', **kw):
# create fake request and response for convenience
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
stdin = StringIO()
stdout = StringIO()
# minimal environment needed
env = dict(SERVER_NAME='localhost',
SERVER_PORT='80',
REQUEST_METHOD='GET',
SCRIPT_NAME=path)
response = HTTPResponse(stdout=stdout)
request = HTTPRequest(stdin, env, response)
self.requests.append(request)
return request
def testDumperFormat(self):
from Products.LongRequestLogger.dumper import Dumper
dumper = Dumper()
check_dump(dumper.format_thread())
def testDumperRequestExtraction(self):
# The dumper extract requests by looking for the frame that contains
# call_object and then looking for the 'request' variable inside it
from ZPublisher.Publish import call_object
from Products.LongRequestLogger.dumper import Dumper
def callable():
dumper = Dumper()
frame = sys._current_frames()[dumper.thread_id]
return dumper.extract_request(frame)
request = self.makeRequest('/foo')
retrieved_request = call_object(callable, (), request)
self.assertTrue(request is retrieved_request)
def testRequestFormating(self):
from Products.LongRequestLogger.dumper import Dumper
dumper = Dumper()
request = self.makeRequest('/foo/bar')
check_request_formating(dumper.format_request(request))
def testDumperLog(self):
from Products.LongRequestLogger.dumper import Dumper
dumper = Dumper()
# check the dumper will log what we expect when called
dumper()
check_log(str(self.loghandler))
def testMonitorStopBeforeTimeout(self):
from Products.LongRequestLogger.monitor import Monitor
m = Monitor()
# sleep just a little to let the other thread start
s = Sleeper(0.01)
s.sleep()
self.assertTrue(m.isAlive())
m.stop()
self.assertFalse(m.isAlive())
# unless this test is so slow that there were 2 seconds interval
# between starting the monitor and stopping it, there should be no
# logged messages
self.assertFalse(self.loghandler.records)
def testMonitorStopAfterTimeout(self):
from Products.LongRequestLogger.monitor import Monitor
m = Monitor()
s = Sleeper(m.dumper.timeout + 0.5)
# sleep a little more than the timeout to be on the safe side
s.sleep()
m.stop()
check_monitor_log(str(self.loghandler))
def testMonitorStopAfterTimeoutAndTwoIntervals(self):
from Products.LongRequestLogger.monitor import Monitor
m = Monitor()
s = Sleeper(m.dumper.timeout + 2 * m.dumper.interval + 0.5)
# sleep a little more than timeout + intervals to be on the safe
# side
s.sleep()
m.stop()
check_monitor_2_intervals_log(str(self.loghandler))
def testMonitorConfigurationDisabled(self):
from Products.LongRequestLogger.monitor import Monitor
from Products.LongRequestLogger.dumper import DEFAULT_TIMEOUT
from Products.LongRequestLogger.dumper import DEFAULT_INTERVAL
os.environ['longrequestlogger_file'] = ''
m = Monitor()
s = Sleeper(DEFAULT_TIMEOUT + 2 * DEFAULT_INTERVAL + 0.5)
# sleep a little more than timeout + intervals
s.sleep()
# the thread shouldn't run disabled
self.assertFalse(m.isAlive())
# stopping shouldn't break nonetheless
m.stop()
self.assertFalse(m.isAlive())
# and there should be no records
self.assertFalse(self.loghandler.records)
def testMonitorWithEnvironmentConfiguration(self):
from Products.LongRequestLogger.monitor import Monitor
os.environ['longrequestlogger_timeout'] = '3.5'
os.environ['longrequestlogger_interval'] = '2'
m = Monitor()
s = Sleeper(m.dumper.timeout + m.dumper.interval + 0.5)
# sleep a little more than the timeout to be on the safe side
s.sleep()
m.stop()
check_monitor_environment_log(str(self.loghandler))
def testIsPatched(self):
import ZPublisher.Publish
import Products.LongRequestLogger
self.assertEquals(
ZPublisher.Publish.publish_module_standard,
Products.LongRequestLogger.patch.wrapper
)
def testPublish(self):
from ZPublisher.Publish import publish_module_standard
# Before publishing, there should be no slow query records.
self.assertFalse(self.loghandler.records)
# Request taking (timeout + interval + margin) 3.5 seconds...
request = self.makeRequest('/', interval=3.5)
request['interval'] = 3.5
publish_module_standard('Products.LongRequestLogger.tests.common',
request=request,
response=request.response,
debug=True)
# ...should generate query log records like these
check_publishing_1_interval_log(str(self.loghandler))
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(TestLongRequestLogger),
))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment