Commit ddea96bb authored by Hanno Schlichting's avatar Hanno Schlichting

Move exceptionhook code into ZServer.

parent 10dbbaee
...@@ -14,27 +14,15 @@ ...@@ -14,27 +14,15 @@
""" """
import imp import imp
import logging
import sys import sys
from time import asctime from time import asctime
import AccessControl.User import AccessControl.User
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager from AccessControl.SecurityManagement import noSecurityManager
from Acquisition import (
aq_acquire,
aq_base,
aq_parent,
)
from Acquisition.interfaces import IAcquirer
import ExtensionClass
from six import reraise
import transaction import transaction
from zExceptions import Redirect
from zExceptions import Unauthorized
import ZODB import ZODB
from ZODB.POSException import ConflictError from zope.deferredimport import deprecated
from zope.component import queryMultiAdapter
from zope.event import notify from zope.event import notify
from zope.processlifetime import DatabaseOpened from zope.processlifetime import DatabaseOpened
from zope.processlifetime import DatabaseOpenedWithRoot from zope.processlifetime import DatabaseOpenedWithRoot
...@@ -43,7 +31,15 @@ from App.config import getConfiguration ...@@ -43,7 +31,15 @@ from App.config import getConfiguration
import App.ZApplication import App.ZApplication
import OFS.Application import OFS.Application
import Zope2 import Zope2
from ZPublisher import Retry
# BBB Zope 5.0
deprecated(
'Please import from ZServer.ZPublisher.exceptionhook.',
RequestContainer='ZServer.ZPublisher.exceptionhook:RequestContainer',
zpublisher_exception_hook=(
'ZServer.ZPublisher.exceptionhook:EXCEPTION_HOOK'),
ZPublisherExceptionHook='ZServer.ZPublisher.exceptionhook:ExceptionHook',
)
app = None app = None
startup_time = asctime() startup_time = asctime()
...@@ -153,152 +149,8 @@ def startup(): ...@@ -153,152 +149,8 @@ def startup():
notify(DatabaseOpenedWithRoot(DB)) notify(DatabaseOpenedWithRoot(DB))
Zope2.zpublisher_transactions_manager = transaction.manager Zope2.zpublisher_transactions_manager = transaction.manager
Zope2.zpublisher_exception_hook = zpublisher_exception_hook
Zope2.zpublisher_validated_hook = validated_hook Zope2.zpublisher_validated_hook = validated_hook
def validated_hook(request, user): def validated_hook(request, user):
newSecurityManager(request, user) newSecurityManager(request, user)
class RequestContainer(ExtensionClass.Base):
def __init__(self, r):
self.REQUEST = r
class ZPublisherExceptionHook:
def __init__(self):
self.conflict_errors = 0
self.unresolved_conflict_errors = 0
self.conflict_logger = logging.getLogger('ZPublisher.Conflict')
self.error_message = 'standard_error_message'
self.raise_error_message = 'raise_standardErrorMessage'
def logConflicts(self, v, REQUEST):
self.conflict_errors += 1
level = getattr(getConfiguration(), 'conflict_error_log_level', 0)
if not self.conflict_logger.isEnabledFor(level):
return False
self.conflict_logger.log(
level,
"%s at %s: %s (%d conflicts (%d unresolved) "
"since startup at %s)",
v.__class__.__name__,
REQUEST.get('PATH_INFO', '<unknown>'),
v,
self.conflict_errors,
self.unresolved_conflict_errors,
startup_time)
return True
def __call__(self, published, REQUEST, t, v, traceback):
try:
if t is SystemExit or issubclass(t, Redirect):
reraise(t, v, traceback)
if issubclass(t, ConflictError):
self.logConflicts(v, REQUEST)
raise Retry(t, v, traceback)
if t is Retry:
try:
v.reraise()
except:
# we catch the re-raised exception so that it gets
# stored in the error log and gets rendered with
# standard_error_message
t, v, traceback = sys.exc_info()
if issubclass(t, ConflictError):
# ouch, a user saw this conflict error :-(
self.unresolved_conflict_errors += 1
error_log_url = ''
if not isinstance(published, list):
try:
log = aq_acquire(published, '__error_log__', containment=1)
except AttributeError:
pass
else:
if log is not None:
error_log_url = log.raising((t, v, traceback))
if (REQUEST is None or
(getattr(REQUEST.get('RESPONSE', None),
'_error_format', '') != 'text/html')):
reraise(t, v, traceback)
# Lookup a view for the exception and render it, then
# raise the rendered value as the exception value
# (basically the same that 'raise_standardErrorMessage'
# does. The view is named 'index.html' because that's what
# zope.publisher uses as well.
view = queryMultiAdapter((v, REQUEST), name=u'index.html')
if view is not None:
if (IAcquirer.providedBy(view) and
IAcquirer.providedBy(published)):
view = view.__of__(published)
else:
view.__parent__ = published
v = view()
if issubclass(t, Unauthorized):
# Re-raise Unauthorized to make sure it is handled
# correctly. We can't do that with all exceptions
# because some don't work with the rendered v as
# argument.
reraise(t, v, traceback)
response = REQUEST.RESPONSE
response.setStatus(t)
response.setBody(v)
return response
if (published is None or published is app or
isinstance(published, list)):
# At least get the top-level object
published = app.__bobo_traverse__(REQUEST).__of__(
RequestContainer(REQUEST))
published = getattr(published, 'im_self', published)
while 1:
f = getattr(published, self.raise_error_message, None)
if f is None:
published = aq_parent(published)
if published is None:
reraise(t, v, traceback)
else:
break
client = published
while 1:
if getattr(client, self.error_message, None) is not None:
break
client = aq_parent(client)
# If we are going in circles without getting the error_message
# let the response handle it
if client is None or aq_base(client) is aq_base(published):
response = REQUEST.RESPONSE
response.exception()
return response
if REQUEST.get('AUTHENTICATED_USER', None) is None:
REQUEST['AUTHENTICATED_USER'] = AccessControl.User.nobody
result = f(client, REQUEST, t, v, traceback,
error_log_url=error_log_url)
if result is not None:
t, v, traceback = result
if issubclass(t, Unauthorized):
# Re-raise Unauthorized to make sure it is handled
# correctly. We can't do that with all exceptions
# because some don't work with the rendered v as
# argument.
reraise(t, v, traceback)
response = REQUEST.RESPONSE
response.setStatus(t)
response.setBody(v)
return response
finally:
traceback = None
zpublisher_exception_hook = ZPublisherExceptionHook()
##############################################################################
#
# Copyright (c) 2007 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import sys
import unittest
import logging
import Acquisition
from zope.component.testing import PlacelessSetup
from zope.interface.common.interfaces import IException
from zope.publisher.skinnable import setDefaultSkin
from zope.publisher.interfaces import INotFound
from zope.security.interfaces import IUnauthorized
from zope.security.interfaces import IForbidden
class ExceptionHookTestCase(unittest.TestCase):
def _makeOne(self):
from Zope2.App.startup import ZPublisherExceptionHook
return ZPublisherExceptionHook()
def _makeRequest(self, stdin=None, environ=None,
response=None, clean=1, stdout=None):
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
if stdin is None:
from StringIO import StringIO
stdin = StringIO()
if stdout is None:
from StringIO import StringIO
stdout = StringIO()
if environ is None:
environ = {}
if 'SERVER_NAME' not in environ:
environ['SERVER_NAME'] = 'http://localhost'
if 'SERVER_PORT' not in environ:
environ['SERVER_PORT'] = '8080'
if response is None:
response = HTTPResponse(stdout=stdout)
req = HTTPRequest(stdin, environ, response, clean)
setDefaultSkin(req)
return req
def call(self, published, request, f, args=None, kw=None):
hook = self._makeOne()
try:
if args is None:
args = ()
if kw is None:
kw = {}
f(*args, **kw)
except:
return hook(published, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
def call_no_exc(self, hook, published, request, f, args=None, kw=None):
if hook is None:
hook = self._makeOne()
try:
if args is None:
args = ()
if kw is None:
kw = {}
f(*args, **kw)
except:
try:
hook(published, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
except:
pass
return hook
def call_exc_value(self, published, request, f, args=None, kw=None):
hook = self._makeOne()
try:
if args is None:
args = ()
if kw is None:
kw = {}
f(*args, **kw)
except:
try:
return hook(published, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
except Exception as e:
return e
class ExceptionHookTest(ExceptionHookTestCase):
def testSystemExit(self):
def f():
raise SystemExit(1)
self.assertRaises(SystemExit, self.call, None, None, f)
def testUnauthorized(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized('1')
self.assertRaises(Unauthorized, self.call, None, {}, f)
def testConflictErrorRaisesRetry(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
from App.config import getConfiguration
def f():
raise ConflictError()
request = self._makeRequest()
old_value = getattr(getConfiguration(), 'conflict_error_log_level', 0)
self.assertEquals(old_value, 0) # default value
try:
getConfiguration().conflict_error_log_level = logging.CRITICAL
level = getattr(getConfiguration(), 'conflict_error_log_level', 0)
self.assertEquals(level, logging.CRITICAL)
self.assertRaises(Retry, self.call, None, request, f)
finally:
getConfiguration().conflict_error_log_level = old_value
def testConflictErrorCount(self):
from ZODB.POSException import ConflictError
def f():
raise ConflictError()
hook = self._makeOne()
self.assertEquals(hook.conflict_errors, 0)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.conflict_errors, 1)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.conflict_errors, 2)
def testRetryRaisesOriginalException(self):
from ZPublisher import Retry
class CustomException(Exception):
pass
def f():
try:
raise CustomException('Zope')
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
self.assertRaises(CustomException, self.call, None, {}, f)
def testRetryRaisesConflictError(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
def f():
try:
raise ConflictError()
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
self.assertRaises(ConflictError, self.call, None, {}, f)
def testRetryUnresolvedConflictErrorCount(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
def f():
try:
raise ConflictError()
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
hook = self._makeOne()
self.assertEquals(hook.unresolved_conflict_errors, 0)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.unresolved_conflict_errors, 1)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.unresolved_conflict_errors, 2)
class Client(Acquisition.Explicit):
def __init__(self):
self.standard_error_message = True
self.messages = []
def dummyMethod(self):
return 'Aye'
class StandardClient(Client):
def raise_standardErrorMessage(self, c, r, t, v, tb, error_log_url):
from zExceptions.ExceptionFormatter import format_exception
fmt = format_exception(t, v, tb, as_html=0)
self.messages.append(''.join([error_log_url] + fmt))
class BrokenClient(Client):
def raise_standardErrorMessage(self, c, r, t, v, tb, error_log_url):
raise AttributeError('ouch')
class ExceptionMessageRenderTest(ExceptionHookTestCase):
def testRenderUnauthorizedStandardClient(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized('1')
request = self._makeRequest()
client = StandardClient()
self.call(client, request, f)
self.assertTrue(client.messages, client.messages)
tb = client.messages[0]
self.assertTrue("Unauthorized: You are not allowed" in tb, tb)
def testRenderUnauthorizedStandardClientMethod(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized('1')
request = self._makeRequest()
client = StandardClient()
self.call(client.dummyMethod, request, f)
self.assertTrue(client.messages, client.messages)
tb = client.messages[0]
self.assertTrue("Unauthorized: You are not allowed" in tb, tb)
def testRenderUnauthorizedBrokenClient(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized('1')
request = self._makeRequest()
client = BrokenClient()
self.assertRaises(AttributeError, self.call, client, request, f)
def testRenderRetryRaisesOriginalException(self):
from ZPublisher import Retry
class CustomException(Exception):
pass
def f():
try:
raise CustomException('Zope')
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
request = self._makeRequest()
client = StandardClient()
self.call(client, request, f)
self.assertTrue(client.messages, client.messages)
tb = client.messages[0]
self.assertTrue("CustomException: Zope" in tb, tb)
def testRenderRetryRaisesConflictError(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
def f():
try:
raise ConflictError()
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
request = self._makeRequest()
client = StandardClient()
self.call(client, request, f)
self.assertTrue(client.messages, client.messages)
tb = client.messages[0]
self.assertTrue("ConflictError: database conflict error" in tb, tb)
class CustomExceptionView(Acquisition.Explicit):
def __init__(self, context, request):
self.context = context
self.request = request
def __call__(self):
return ("Exception View: %s\nContext: %s" % (
self.context.__class__.__name__,
Acquisition.aq_parent(self).__class__.__name__))
def registerExceptionView(for_):
from zope.interface import Interface
from zope.component import getGlobalSiteManager
from zope.publisher.interfaces.browser import IDefaultBrowserLayer
gsm = getGlobalSiteManager()
gsm.registerAdapter(
CustomExceptionView,
required=(for_, IDefaultBrowserLayer),
provided=Interface,
name=u'index.html',
)
class ExceptionViewsTest(PlacelessSetup, ExceptionHookTestCase):
def testCustomExceptionViewUnauthorized(self):
from AccessControl import Unauthorized
registerExceptionView(IUnauthorized)
def f():
raise Unauthorized('1')
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.assertTrue(isinstance(v, Unauthorized), v)
self.assertTrue("Exception View: Unauthorized" in str(v))
self.assertTrue("Context: StandardClient" in str(v))
def testCustomExceptionViewForbidden(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import Forbidden
registerExceptionView(IForbidden)
def f():
raise Forbidden("argh")
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.assertTrue(isinstance(v, HTTPResponse), v)
self.assertTrue(v.status == 403, (v.status, 403))
self.assertTrue("Exception View: Forbidden" in str(v))
def testCustomExceptionViewNotFound(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import NotFound
registerExceptionView(INotFound)
def f():
raise NotFound("argh")
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.assertTrue(isinstance(v, HTTPResponse), v)
self.assertTrue(v.status == 404, (v.status, 404))
self.assertTrue("Exception View: NotFound" in str(v), v)
def testCustomExceptionViewBadRequest(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import BadRequest
registerExceptionView(IException)
def f():
raise BadRequest("argh")
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.assertTrue(isinstance(v, HTTPResponse), v)
self.assertTrue(v.status == 400, (v.status, 400))
self.assertTrue("Exception View: BadRequest" in str(v), v)
def testCustomExceptionViewInternalError(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import InternalError
registerExceptionView(IException)
def f():
raise InternalError("argh")
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.assertTrue(isinstance(v, HTTPResponse), v)
self.assertTrue(v.status == 500, (v.status, 500))
self.assertTrue("Exception View: InternalError" in str(v), v)
def testRedirectNoExceptionView(self):
from zExceptions import Redirect
registerExceptionView(IException)
def f():
raise Redirect("http://zope.org/")
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.assertTrue(isinstance(v, Redirect), v)
self.assertEquals(v.args[0], "http://zope.org/")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment