Commit 8d81b128 authored by Jérome Perrin's avatar Jérome Perrin

ERP5TypeTestCase: stop copying ZopeTestCase's publish implementation

Call the original method instead of copy/pasting the code

The original method will use .encode() on the basic= argument, so it
needs to be an ascii str or an unicode on python2, we had to change
some callers to pass basic= as text, to prevent:

     File "eggs/Zope-4.5.3+slapospatched001-py2.7.egg/Testing/ZopeTestCase/functional.py", line 43, in wrapped_func
       return func(*args, **kw)
     File "eggs/Zope-4.5.3+slapospatched001-py2.7.egg/Testing/ZopeTestCase/functional.py", line 93, in publish
       env['HTTP_AUTHORIZATION'] = basic_auth_encode(basic)
     File "eggs/Zope-4.5.3+slapospatched001-py2.7.egg/ZPublisher/utils.py", line 91, in basic_auth_encode
       header = b'Basic ' + base64.b64encode(value.encode('latin-1'))
   UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 2: ordinal not in range(128)
parent 4752957c
...@@ -59,7 +59,7 @@ from zope.site.hooks import setSite ...@@ -59,7 +59,7 @@ from zope.site.hooks import setSite
from Testing import ZopeTestCase from Testing import ZopeTestCase
from Testing.makerequest import makerequest from Testing.makerequest import makerequest
from Testing.ZopeTestCase import PortalTestCase, user_name, ZopeLite from Testing.ZopeTestCase import PortalTestCase, user_name, ZopeLite, functional
from Products.ERP5Type.Core.Workflow import ValidationFailed from Products.ERP5Type.Core.Workflow import ValidationFailed
from Products.PythonScripts.PythonScript import PythonScript from Products.PythonScripts.PythonScript import PythonScript
from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter
...@@ -103,6 +103,15 @@ from Products.ZSQLCatalog.SQLCatalog import Query ...@@ -103,6 +103,15 @@ from Products.ZSQLCatalog.SQLCatalog import Query
from Acquisition import aq_base from Acquisition import aq_base
if six.PY2:
from contextlib import contextmanager
@contextmanager
def nullcontext(enter_result=None):
yield enter_result
else:
from contextlib import nullcontext
portal_name = 'erp5_portal' portal_name = 'erp5_portal'
# we keep a reference to all sites for which setup failed the first time, to # we keep a reference to all sites for which setup failed the first time, to
...@@ -218,7 +227,7 @@ def _parse_args(self, *args, **kw): ...@@ -218,7 +227,7 @@ def _parse_args(self, *args, **kw):
_parse_args._original = DateTime._original_parse_args _parse_args._original = DateTime._original_parse_args
DateTime._parse_args = _parse_args DateTime._parse_args = _parse_args
class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase, functional.Functional):
"""Mixin class for ERP5 based tests. """Mixin class for ERP5 based tests.
""" """
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
...@@ -748,116 +757,46 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -748,116 +757,46 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
ZopeTestCase._print('\n%s ' % message) ZopeTestCase._print('\n%s ' % message)
LOG('Testing ... ', DEBUG, message) LOG('Testing ... ', DEBUG, message)
def publish(self, path, basic=None, env=None, extra=None, user=None, def publish(self, path, basic=None, env=None, extra=None,
request_method='GET', stdin=None, handle_errors=True): request_method='GET', stdin=None, handle_errors=True,
'''Publishes the object at 'path' returning a response object.''' user=None):
'''Publishes the object at 'path' returning a response object.
from ZPublisher.HTTPResponse import WSGIResponse
from ZPublisher.WSGIPublisher import publish_module
from ZPublisher.httpexceptions import HTTPExceptionHandler
from Testing.ZopeTestCase.functional import ResponseWrapper
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
# Save current security manager
sm = getSecurityManager()
# Commit the sandbox for good measure
self.commit()
if env is None:
env = {}
request = self.app.REQUEST
env['SERVER_NAME'] = request['SERVER_NAME']
env['SERVER_PORT'] = request['SERVER_PORT']
env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET']
env['SERVER_PROTOCOL'] = 'HTTP/1.1'
env['REQUEST_METHOD'] = request_method
query = ''
if '?' in path:
path, query = path.split("?", 1)
if six.PY2:
env['PATH_INFO'] = unquote_to_bytes(path)
else:
env['PATH_INFO'] = unquote_to_bytes(path).decode('latin-1')
env['QUERY_STRING'] = query
if basic:
assert not user, (basic, user)
env['HTTP_AUTHORIZATION'] = "Basic %s" % \
base64.encodestring(basic).replace('\n', '')
elif user:
PAS = self.portal.acl_users.__class__
orig_extractUserIds = PAS._extractUserIds
from thread import get_ident
me = get_ident()
def _extractUserIds(pas, request, plugins):
if me == get_ident():
info = pas._verifyUser(plugins, user)
return [(info['id'], info['login'])] if info else ()
return orig_extractUserIds(pas, request, plugins)
if stdin is None:
stdin = BytesIO()
outstream = BytesIO()
response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
try:
if user:
PAS._extractUserIds = _extractUserIds
# The following `HTTPRequest` object would be created anyway inside
# `publish_module_standard` if no `request` argument was given.
request = request.__class__(stdin, env, response)
# However, we need to inject the content of `extra` inside the
# request.
if extra:
for k, v in six.iteritems(extra): request[k] = v
wsgi_headers = BytesIO()
def start_response(status, headers):
# Keep the fake response in-sync with the actual values
# from the WSGI start_response call.
response.setStatus(status.split()[0])
for key, value in headers:
response.setHeader(key, value)
wsgi_headers.write(
b'HTTP/1.1 ' + status.encode('ascii') + b'\r\n')
headers = b'\r\n'.join([
(k + ': ' + v).encode('ascii') for k, v in headers])
wsgi_headers.write(headers)
wsgi_headers.write(b'\r\n\r\n')
publish = partial(publish_module, _request=request, _response=response)
if handle_errors:
publish = HTTPExceptionHandler(publish)
wsgi_result = publish(env, start_response)
finally:
if user:
PAS._extractUserIds = orig_extractUserIds
# Restore security manager
setSecurityManager(sm)
# Restore site removed by closing of request
setSite(self.portal)
This is from Testing.ZopeTestCase.Functional, extended to support passing
a user id directly.
'''
if user:
assert not basic
from six.moves._thread import get_ident
me = get_ident()
def _extractUserIds(pas, request, plugins):
if me == get_ident():
info = pas._verifyUser(plugins, user)
return [(info['id'], info['login'])] if info else ()
return mock.DEFAULT
user_context = mock.patch(
'Products.PluggableAuthService.PluggableAuthService.PluggableAuthService._extractUserIds',
side_effect=_extractUserIds,
autospec=True)
else:
user_context = nullcontext()
try:
with user_context:
return super(ERP5TypeTestCaseMixin, self).publish(
path,
basic=basic,
env=env,
extra=extra,
request_method=request_method,
stdin=stdin,
handle_errors=handle_errors,
)
finally:
# Make sure that the skin cache does not have objects that were # Make sure that the skin cache does not have objects that were
# loaded with the connection used by the requested url. # loaded with the connection used by the requested url.
self.changeSkin(self.portal.getCurrentSkinName()) self.changeSkin(self.portal.getCurrentSkinName())
return ResponseWrapper(response, outstream, path,
wsgi_result, wsgi_headers)
def getConsistencyMessageList(self, obj): def getConsistencyMessageList(self, obj):
return sorted([ str(message.getMessage()) return sorted([ str(message.getMessage())
for message in obj.checkConsistency() ]) for message in obj.checkConsistency() ])
......
  • Ah the commit message is outdated, all this part:

    The original method will use .encode() on the basic= argument, so it
    needs to be an ascii str or an unicode on python2, we had to change
    some callers to pass basic= as text, to prevent:
    
         File "eggs/Zope-4.5.3+slapospatched001-py2.7.egg/Testing/ZopeTestCase/functional.py", line 43, in wrapped_func
           return func(*args, **kw)
         File "eggs/Zope-4.5.3+slapospatched001-py2.7.egg/Testing/ZopeTestCase/functional.py", line 93, in publish
           env['HTTP_AUTHORIZATION'] = basic_auth_encode(basic)
         File "eggs/Zope-4.5.3+slapospatched001-py2.7.egg/ZPublisher/utils.py", line 91, in basic_auth_encode
           header = b'Basic ' + base64.b64encode(value.encode('latin-1'))
       UnicodeDecodeError: 'ascii' codec can't decode byte 0xc3 in position 2: ordinal not in range(128)

    should be removed, this was actually a problem in Zope but we fixed it in Zope

  • mentioned in commit ae80fbeb

    Toggle commit list
  • I reverted and pushed again with a proper commit message in 5fc0cd59

  • mentioned in commit d4ed9dc7

    Toggle commit list
  • mentioned in commit d3b66b7a

    Toggle commit list
  • mentioned in commit 1687e35b

    Toggle commit list
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment