Commit da089bf2 authored by Jérome Perrin's avatar Jérome Perrin Committed by Arnaud Fontaine

ERP5TypeTestCase: stop copying ZopeTestCase's publish implementation

Call the original method instead of copy/pasting the code

We still keep the following patches:
 - user: this is an ERP5 addition and maybe we use it in customer project
   code (if we don't I'm in favor of removing the feature, it's used in
   only one place in this repo)
 - call `self.changeSkin(self.portal.getCurrentSkinName())`. Maybe this
   is needed for CMFCore tests and could be good to send upstream, I did
   not check this part
parent d4ed9dc7
......@@ -59,7 +59,7 @@ from zope.site.hooks import setSite
from Testing import ZopeTestCase
from Testing.makerequest import makerequest
from Testing.ZopeTestCase import PortalTestCase, user_name, ZopeLite
from Testing.ZopeTestCase import PortalTestCase, user_name, ZopeLite, functional
from Products.ERP5Type.Core.Workflow import ValidationFailed
from Products.PythonScripts.PythonScript import PythonScript
from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter
......@@ -104,6 +104,15 @@ from Products.ZSQLCatalog.SQLCatalog import Query
from Acquisition import aq_base
if six.PY2:
from contextlib import contextmanager
@contextmanager
def nullcontext(enter_result=None):
yield enter_result
else:
from contextlib import nullcontext
portal_name = 'erp5_portal'
# we keep a reference to all sites for which setup failed the first time, to
......@@ -219,7 +228,7 @@ def _parse_args(self, *args, **kw):
_parse_args._original = DateTime._original_parse_args
DateTime._parse_args = _parse_args
class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase, functional.Functional):
"""Mixin class for ERP5 based tests.
"""
def __init__(self, *args, **kw):
......@@ -750,116 +759,46 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
ZopeTestCase._print('\n%s ' % message)
LOG('Testing ... ', DEBUG, message)
def publish(self, path, basic=None, env=None, extra=None, user=None,
request_method='GET', stdin=None, handle_errors=True):
'''Publishes the object at 'path' returning a response object.'''
from ZPublisher.HTTPResponse import WSGIResponse
from ZPublisher.WSGIPublisher import publish_module
from ZPublisher.httpexceptions import HTTPExceptionHandler
from Testing.ZopeTestCase.functional import ResponseWrapper
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
# Save current security manager
sm = getSecurityManager()
# Commit the sandbox for good measure
self.commit()
if env is None:
env = {}
request = self.app.REQUEST
env['SERVER_NAME'] = request['SERVER_NAME']
env['SERVER_PORT'] = request['SERVER_PORT']
env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET']
env['SERVER_PROTOCOL'] = 'HTTP/1.1'
env['REQUEST_METHOD'] = request_method
query = ''
if '?' in path:
path, query = path.split("?", 1)
if six.PY2:
env['PATH_INFO'] = unquote_to_bytes(path)
else:
env['PATH_INFO'] = unquote_to_bytes(path).decode('latin-1')
env['QUERY_STRING'] = query
if basic:
assert not user, (basic, user)
env['HTTP_AUTHORIZATION'] = "Basic %s" % \
base64.encodestring(basic).replace('\n', '')
elif user:
PAS = self.portal.acl_users.__class__
orig_extractUserIds = PAS._extractUserIds
from thread import get_ident
me = get_ident()
def _extractUserIds(pas, request, plugins):
if me == get_ident():
info = pas._verifyUser(plugins, user)
return [(info['id'], info['login'])] if info else ()
return orig_extractUserIds(pas, request, plugins)
if stdin is None:
stdin = BytesIO()
outstream = BytesIO()
response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
try:
if user:
PAS._extractUserIds = _extractUserIds
# The following `HTTPRequest` object would be created anyway inside
# `publish_module_standard` if no `request` argument was given.
request = request.__class__(stdin, env, response)
# However, we need to inject the content of `extra` inside the
# request.
if extra:
for k, v in six.iteritems(extra): request[k] = v
wsgi_headers = BytesIO()
def start_response(status, headers):
# Keep the fake response in-sync with the actual values
# from the WSGI start_response call.
response.setStatus(status.split()[0])
for key, value in headers:
response.setHeader(key, value)
wsgi_headers.write(
b'HTTP/1.1 ' + status.encode('ascii') + b'\r\n')
headers = b'\r\n'.join([
(k + ': ' + v).encode('ascii') for k, v in headers])
wsgi_headers.write(headers)
wsgi_headers.write(b'\r\n\r\n')
publish = partial(publish_module, _request=request, _response=response)
if handle_errors:
publish = HTTPExceptionHandler(publish)
wsgi_result = publish(env, start_response)
finally:
if user:
PAS._extractUserIds = orig_extractUserIds
# Restore security manager
setSecurityManager(sm)
# Restore site removed by closing of request
setSite(self.portal)
def publish(self, path, basic=None, env=None, extra=None,
request_method='GET', stdin=None, handle_errors=True,
user=None):
'''Publishes the object at 'path' returning a response object.
This is from Testing.ZopeTestCase.Functional, extended to support passing
a user id directly.
'''
if user:
assert not basic
from six.moves._thread import get_ident
me = get_ident()
def _extractUserIds(pas, request, plugins):
if me == get_ident():
info = pas._verifyUser(plugins, user)
return [(info['id'], info['login'])] if info else ()
return mock.DEFAULT
user_context = mock.patch(
'Products.PluggableAuthService.PluggableAuthService.PluggableAuthService._extractUserIds',
side_effect=_extractUserIds,
autospec=True)
else:
user_context = nullcontext()
try:
with user_context:
return super(ERP5TypeTestCaseMixin, self).publish(
path,
basic=basic,
env=env,
extra=extra,
request_method=request_method,
stdin=stdin,
handle_errors=handle_errors,
)
finally:
# Make sure that the skin cache does not have objects that were
# loaded with the connection used by the requested url.
self.changeSkin(self.portal.getCurrentSkinName())
return ResponseWrapper(response, outstream, path,
wsgi_result, wsgi_headers)
def getConsistencyMessageList(self, obj):
return sorted([ str(message.getMessage())
for message in obj.checkConsistency() ])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment