Commit f52d26b7 authored by Martijn Faassen's avatar Martijn Faassen

Integrate Zope 3-based exception views. Patch by Sidnei, integration

work done for Infrae.
parent 3494ef44
......@@ -100,6 +100,27 @@ Zope Changes
- AccessControl: the form behind the "Security" tab has a new form
for user-related reporting of permissions and roles
- Zope 3-based exception views can now be registered in ZCML for
various exceptions that can be raised by Zope. Registering an
exception view can be done like this:
<browser:page
for="zope.publisher.interfaces.INotFound"
class=".view.SomeView"
name="index.html"
permission="zope.Public" />
Relevant exceptions that can have views are:
zope.interface.common.interfaces.IException
zope.publisher.interfaces.INotFound
zope.security.interfaces.IForbidden
zope.security.interfaces.IUnauthorized
Note that the name has to be 'index.html' for the exception
view to work. (patch by Sidnei da Silva from Enfold,
integration by Martijn Faassen (Startifact) for Infrae)
Bugs Fixed
- Collector #1306: Missing acquisition context on local roles screen.
......
......@@ -13,6 +13,7 @@
"""Initialize the Zope2 Package and provide a published module
"""
from zope.component import queryMultiAdapter
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
......@@ -20,7 +21,7 @@ from Acquisition import aq_acquire
from App.config import getConfiguration
from time import asctime
from types import StringType, ListType
from zExceptions import Unauthorized
from zExceptions import Unauthorized, Redirect
from ZODB.POSException import ConflictError
import transaction
import AccessControl.User
......@@ -37,6 +38,8 @@ import App.ZApplication
import Zope2
import ZPublisher
app = None
startup_time = asctime()
def startup():
global app
......@@ -132,100 +135,124 @@ def validated_hook(request, user):
)
Zope2.DB.removeVersionPool(version)
raise Unauthorized, "You don't have permission to enter versions."
class RequestContainer(ExtensionClass.Base):
def __init__(self,r): self.REQUEST=r
conflict_errors = 0
unresolved_conflict_errors = 0
conflict_logger = logging.getLogger('ZPublisher.Conflict')
class ZPublisherExceptionHook:
def __init__(self):
self.conflict_errors = 0
self.unresolved_conflict_errors = 0
self.conflict_logger = logging.getLogger('ZPublisher.Conflict')
self.error_message = 'standard_error_message'
self.raise_error_message = 'raise_standardErrorMessage'
def logConflicts(self, v, REQUEST):
self.conflict_errors += 1
level = getattr(getConfiguration(), 'conflict_error_log_level', 0)
if not self.conflict_logger.isEnabledFor(level):
return False
self.conflict_logger.log(
level,
"%s at %s: %s (%d conflicts (%d unresolved) "
"since startup at %s)",
v.__class__.__name__,
REQUEST.get('PATH_INFO', '<unknown>'),
v,
self.conflict_errors,
self.unresolved_conflict_errors,
startup_time)
return True
def __call__(self, published, REQUEST, t, v, traceback):
try:
if isinstance(t, StringType):
if t.lower() in ('unauthorized', 'redirect'):
raise
else:
if t is SystemExit or t is Redirect:
raise
def zpublisher_exception_hook(published, REQUEST, t, v, traceback):
global unresolved_conflict_errors
global conflict_errors
try:
if isinstance(t, StringType):
if t.lower() in ('unauthorized', 'redirect'):
raise
else:
if t is SystemExit:
raise
if issubclass(t, ConflictError):
conflict_errors += 1
level = getConfiguration().conflict_error_log_level
if level:
conflict_logger.log(level,
"%s at %s: %s (%d conflicts (%d unresolved) "
"since startup at %s)",
v.__class__.__name__,
REQUEST.get('PATH_INFO', '<unknown>'),
v,
conflict_errors,
unresolved_conflict_errors,
startup_time)
raise ZPublisher.Retry(t, v, traceback)
if t is ZPublisher.Retry:
try:
v.reraise()
except:
# we catch the re-raised exception so that it gets
# stored in the error log and gets rendered with
# standard_error_message
t, v, traceback = sys.exc_info()
if issubclass(t, ConflictError):
# ouch, a user saw this conflict error :-(
unresolved_conflict_errors += 1
self.logConflicts(v, REQUEST)
raise ZPublisher.Retry(t, v, traceback)
if t is ZPublisher.Retry:
try:
v.reraise()
except:
# we catch the re-raised exception so that it gets
# stored in the error log and gets rendered with
# standard_error_message
t, v, traceback = sys.exc_info()
if issubclass(t, ConflictError):
# ouch, a user saw this conflict error :-(
self.unresolved_conflict_errors += 1
try:
log = aq_acquire(published, '__error_log__', containment=1)
except AttributeError:
error_log_url = ''
else:
error_log_url = log.raising((t, v, traceback))
if (getattr(REQUEST.get('RESPONSE', None), '_error_format', '')
!='text/html'):
raise t, v, traceback
if (published is None or published is app or
type(published) is ListType):
# At least get the top-level object
published=app.__bobo_traverse__(REQUEST).__of__(
RequestContainer(REQUEST))
published=getattr(published, 'im_self', published)
while 1:
f=getattr(published, 'raise_standardErrorMessage', None)
if f is None:
published=getattr(published, 'aq_parent', None)
if published is None:
raise t, v, traceback
try:
log = aq_acquire(published, '__error_log__', containment=1)
except AttributeError:
error_log_url = ''
else:
break
client=published
while 1:
if getattr(client, 'standard_error_message', None) is not None:
break
client=getattr(client, 'aq_parent', None)
if client is None:
error_log_url = log.raising((t, v, traceback))
if (getattr(REQUEST.get('RESPONSE', None), '_error_format', '')
!='text/html'):
raise t, v, traceback
if REQUEST.get('AUTHENTICATED_USER', None) is None:
REQUEST['AUTHENTICATED_USER']=AccessControl.User.nobody
# Lookup a view for the exception and render it, then
# raise the rendered value as the exception value
# (basically the same that 'raise_standardErrorMessage'
# does. The view is named 'index.html' because that's what
# Zope 3 uses as well.
view = queryMultiAdapter((v, REQUEST), name=u'index.html')
if view is not None:
v = view()
response = REQUEST.RESPONSE
response.setStatus(t)
response.setBody(v)
return response
if (published is None or published is app or
type(published) is ListType):
# At least get the top-level object
published=app.__bobo_traverse__(REQUEST).__of__(
RequestContainer(REQUEST))
published = getattr(published, 'im_self', published)
while 1:
f = getattr(published, self.raise_error_message, None)
if f is None:
published = getattr(published, 'aq_parent', None)
if published is None:
raise t, v, traceback
else:
break
try:
f(client, REQUEST, t, v, traceback, error_log_url=error_log_url)
except TypeError:
# Pre 2.6 call signature
f(client, REQUEST, t, v, traceback)
client = published
while 1:
if getattr(client, self.error_message, None) is not None:
break
client = getattr(client, 'aq_parent', None)
if client is None:
raise t, v, traceback
if REQUEST.get('AUTHENTICATED_USER', None) is None:
REQUEST['AUTHENTICATED_USER'] = AccessControl.User.nobody
try:
f(client, REQUEST, t, v, traceback, error_log_url=error_log_url)
except TypeError:
# Pre 2.6 call signature
f(client, REQUEST, t, v, traceback)
finally:
traceback=None
finally:
traceback = None
zpublisher_exception_hook = ZPublisherExceptionHook()
ac_logger = logging.getLogger('event.AccessControl')
class TransactionsManager:
......
##############################################################################
#
# Copyright (c) 2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Tests of the Zope2.App package."""
##############################################################################
#
# Copyright (c) 2007 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
import sys
import unittest
import logging
from zope.publisher.interfaces import INotFound
from zope.security.interfaces import IUnauthorized
from zope.security.interfaces import IForbidden
from zope.interface.common.interfaces import IException
from zope.app.testing import ztapi
from zope.app.testing.placelesssetup import PlacelessSetup
from zope.publisher.browser import setDefaultSkin
class ExceptionHookTestCase(unittest.TestCase):
def _makeOne(self):
from Zope2.App.startup import ZPublisherExceptionHook
return ZPublisherExceptionHook()
def _makeRequest(self, stdin=None, environ=None,
response=None, clean=1, stdout=None):
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
if stdin is None:
from StringIO import StringIO
stdin = StringIO()
if stdout is None:
from StringIO import StringIO
stdout = StringIO()
if environ is None:
environ = {}
if 'SERVER_NAME' not in environ:
environ['SERVER_NAME'] = 'http://localhost'
if 'SERVER_PORT' not in environ:
environ['SERVER_PORT'] = '8080'
if response is None:
response = HTTPResponse(stdout=stdout)
req = HTTPRequest(stdin, environ, response, clean)
setDefaultSkin(req)
return req
def call(self, published, request, f, args=None, kw=None):
hook = self._makeOne()
try:
if args is None:
args = ()
if kw is None:
kw = {}
f(*args, **kw)
except:
return hook(published, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
def call_no_exc(self, hook, published, request, f, args=None, kw=None):
if hook is None:
hook = self._makeOne()
try:
if args is None:
args = ()
if kw is None:
kw = {}
f(*args, **kw)
except:
try:
hook(published, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
except:
pass
return hook
def call_exc_value(self, published, request, f, args=None, kw=None):
hook = self._makeOne()
try:
if args is None:
args = ()
if kw is None:
kw = {}
f(*args, **kw)
except:
try:
return hook(published, request,
sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2],
)
except Exception, e:
return e
class ExceptionHookTest(ExceptionHookTestCase):
def testStringException1(self):
def f():
raise 'unauthorized', 'x'
self.assertRaises('unauthorized', self.call, None, None, f)
def testStringException2(self):
def f():
raise 'redirect', 'x'
self.assertRaises('redirect', self.call, None, None, f)
def testSystemExit(self):
def f():
raise SystemExit, 1
self.assertRaises(SystemExit, self.call, None, None, f)
def testUnauthorized(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized, 1
self.assertRaises(Unauthorized, self.call, None, {}, f)
def testConflictErrorRaisesRetry(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
from App.config import getConfiguration
def f():
raise ConflictError
request = self._makeRequest()
old_value = getattr(getConfiguration(), 'conflict_error_log_level', 0)
self.assertEquals(old_value, 0) # default value
try:
getConfiguration().conflict_error_log_level = logging.CRITICAL
level = getattr(getConfiguration(), 'conflict_error_log_level', 0)
self.assertEquals(level, logging.CRITICAL)
self.assertRaises(Retry, self.call, None, request, f)
finally:
getConfiguration().conflict_error_log_level = old_value
def testConflictErrorCount(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
def f():
raise ConflictError
hook = self._makeOne()
self.assertEquals(hook.conflict_errors, 0)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.conflict_errors, 1)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.conflict_errors, 2)
def testRetryRaisesOriginalException(self):
from ZPublisher import Retry
class CustomException(Exception):
pass
def f():
try:
raise CustomException, 'Zope'
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
self.assertRaises(CustomException, self.call, None, {}, f)
def testRetryRaisesConflictError(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
def f():
try:
raise ConflictError
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
self.assertRaises(ConflictError, self.call, None, {}, f)
def testRetryUnresolvedConflictErrorCount(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
def f():
try:
raise ConflictError
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
hook = self._makeOne()
self.assertEquals(hook.unresolved_conflict_errors, 0)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.unresolved_conflict_errors, 1)
self.call_no_exc(hook, None, None, f)
self.assertEquals(hook.unresolved_conflict_errors, 2)
class Client:
def __init__(self):
self.standard_error_message = True
self.messages = []
def dummyMethod(self):
return 'Aye'
class OldClient(Client):
def raise_standardErrorMessage(self, c, r, t, v, tb):
from zExceptions.ExceptionFormatter import format_exception
self.messages.append(''.join(format_exception(t, v, tb, as_html=0)))
class StandardClient(Client):
def raise_standardErrorMessage(self, c, r, t, v, tb, error_log_url):
from zExceptions.ExceptionFormatter import format_exception
fmt = format_exception(t, v, tb, as_html=0)
self.messages.append(''.join([error_log_url] + fmt))
class BrokenClient(Client):
def raise_standardErrorMessage(self, c, r, t, v, tb, error_log_url):
raise AttributeError, 'ouch'
class ExceptionMessageRenderTest(ExceptionHookTestCase):
def testRenderUnauthorizedOldClient(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized, 1
request = self._makeRequest()
client = OldClient()
self.call(client, request, f)
self.failUnless(client.messages, client.messages)
tb = client.messages[0]
self.failUnless("Unauthorized: You are not allowed" in tb, tb)
def testRenderUnauthorizedStandardClient(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized, 1
request = self._makeRequest()
client = StandardClient()
self.call(client, request, f)
self.failUnless(client.messages, client.messages)
tb = client.messages[0]
self.failUnless("Unauthorized: You are not allowed" in tb, tb)
def testRenderUnauthorizedStandardClientMethod(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized, 1
request = self._makeRequest()
client = StandardClient()
self.call(client.dummyMethod, request, f)
self.failUnless(client.messages, client.messages)
tb = client.messages[0]
self.failUnless("Unauthorized: You are not allowed" in tb, tb)
def testRenderUnauthorizedBrokenClient(self):
from AccessControl import Unauthorized
def f():
raise Unauthorized, 1
request = self._makeRequest()
client = BrokenClient()
self.assertRaises(AttributeError, self.call, client, request, f)
def testRenderRetryRaisesOriginalException(self):
from ZPublisher import Retry
class CustomException(Exception):
pass
def f():
try:
raise CustomException, 'Zope'
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
request = self._makeRequest()
client = StandardClient()
self.call(client, request, f)
self.failUnless(client.messages, client.messages)
tb = client.messages[0]
self.failUnless("CustomException: Zope" in tb, tb)
def testRenderRetryRaisesConflictError(self):
from ZPublisher import Retry
from ZODB.POSException import ConflictError
def f():
try:
raise ConflictError
except:
raise Retry(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2])
request = self._makeRequest()
client = StandardClient()
self.call(client, request, f)
self.failUnless(client.messages, client.messages)
tb = client.messages[0]
self.failUnless("ConflictError: database conflict error" in tb, tb)
class CustomExceptionView:
def __init__(self, context, request):
self.context = context
self.request = request
def __call__(self):
return "Exception View: %s" % self.context.__class__.__name__
class ExceptionViewsTest(PlacelessSetup, ExceptionHookTestCase):
def testCustomExceptionViewUnauthorized(self):
from ZPublisher.HTTPResponse import HTTPResponse
from AccessControl import Unauthorized
ztapi.browserView(IUnauthorized, u'index.html', CustomExceptionView)
def f():
raise Unauthorized, 1
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.failUnless(isinstance(v, HTTPResponse), v)
self.failUnless(v.status == 401, (v.status, 401))
self.failUnless("Exception View: Unauthorized" in str(v))
def testCustomExceptionViewForbidden(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import Forbidden
ztapi.browserView(IForbidden, u'index.html', CustomExceptionView)
def f():
raise Forbidden, "argh"
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.failUnless(isinstance(v, HTTPResponse), v)
self.failUnless(v.status == 403, (v.status, 403))
self.failUnless("Exception View: Forbidden" in str(v))
def testCustomExceptionViewNotFound(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import NotFound
ztapi.browserView(INotFound, u'index.html', CustomExceptionView)
def f():
raise NotFound, "argh"
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.failUnless(isinstance(v, HTTPResponse), v)
self.failUnless(v.status == 404, (v.status, 404))
self.failUnless("Exception View: NotFound" in str(v), v)
def testCustomExceptionViewBadRequest(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import BadRequest
ztapi.browserView(IException, u'index.html', CustomExceptionView)
def f():
raise BadRequest, "argh"
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.failUnless(isinstance(v, HTTPResponse), v)
self.failUnless(v.status == 400, (v.status, 400))
self.failUnless("Exception View: BadRequest" in str(v), v)
def testCustomExceptionViewInternalError(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import InternalError
ztapi.browserView(IException, u'index.html', CustomExceptionView)
def f():
raise InternalError, "argh"
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.failUnless(isinstance(v, HTTPResponse), v)
self.failUnless(v.status == 500, (v.status, 500))
self.failUnless("Exception View: InternalError" in str(v), v)
def testRedirectNoExceptionView(self):
from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions import Redirect
ztapi.browserView(IException, u'index.html', CustomExceptionView)
def f():
raise Redirect, "http://zope.org/"
request = self._makeRequest()
client = StandardClient()
v = self.call_exc_value(client, request, f)
self.failUnless(isinstance(v, Redirect), v)
self.assertEquals(v.args[0], "http://zope.org/")
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ExceptionHookTest))
suite.addTest(unittest.makeSuite(ExceptionMessageRenderTest))
suite.addTest(unittest.makeSuite(ExceptionViewsTest))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
......@@ -20,17 +20,22 @@ $Id$
from unauthorized import Unauthorized
from zope.interface import implements
from zope.interface.common.interfaces import IException
from zope.publisher.interfaces import INotFound
from zope.security.interfaces import IForbidden
class BadRequest(Exception):
pass
implements(IException)
class InternalError(Exception):
pass
implements(IException)
class NotFound(Exception):
pass
implements(INotFound)
class Forbidden(Exception):
pass
implements(IForbidden)
class MethodNotAllowed(Exception):
pass
......
......@@ -15,9 +15,13 @@ $Id$
"""
from types import StringType
from zope.interface import implements
from zope.security.interfaces import IUnauthorized
class Unauthorized(Exception):
"""Some user wasn't allowed to access a resource"""
"""Some user wasn't allowed to access a resource
"""
implements(IUnauthorized)
def __init__(self, message=None, value=None, needed=None, name=None, **kw):
"""Possible signatures:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment