Commit d07d2f01 authored by Jérome Perrin's avatar Jérome Perrin Committed by Arnaud Fontaine

ERP5TypeTestCase: stop copying ZopeTestCase's publish implementation

Call the original method instead of copy/pasting the code

We still keep the following patches:
 - user: this is an ERP5 addition and maybe we use it in customer project
   code (if we don't I'm in favor of removing the feature, it's used in
   only one place in this repo)
 - call `self.changeSkin(self.portal.getCurrentSkinName())`. Maybe this
   is needed for CMFCore tests and could be good to send upstream, I did
   not check this part
parent 6f6fd24c
...@@ -29,6 +29,7 @@ from glob import glob ...@@ -29,6 +29,7 @@ from glob import glob
from hashlib import md5 from hashlib import md5
from warnings import warn from warnings import warn
from DateTime import DateTime from DateTime import DateTime
import mock
import Products.ZMySQLDA.DA import Products.ZMySQLDA.DA
from Products.ZMySQLDA.DA import Connection as ZMySQLDA_Connection from Products.ZMySQLDA.DA import Connection as ZMySQLDA_Connection
...@@ -58,7 +59,7 @@ from zope.site.hooks import setSite ...@@ -58,7 +59,7 @@ from zope.site.hooks import setSite
from Testing import ZopeTestCase from Testing import ZopeTestCase
from Testing.makerequest import makerequest from Testing.makerequest import makerequest
from Testing.ZopeTestCase import PortalTestCase, user_name, ZopeLite from Testing.ZopeTestCase import PortalTestCase, user_name, ZopeLite, functional
from Products.ERP5Type.Core.Workflow import ValidationFailed from Products.ERP5Type.Core.Workflow import ValidationFailed
from Products.PythonScripts.PythonScript import PythonScript from Products.PythonScripts.PythonScript import PythonScript
from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter from Products.ERP5Type.Accessor.Constant import PropertyGetter as ConstantGetter
...@@ -103,6 +104,15 @@ from Products.ZSQLCatalog.SQLCatalog import Query ...@@ -103,6 +104,15 @@ from Products.ZSQLCatalog.SQLCatalog import Query
from Acquisition import aq_base from Acquisition import aq_base
if six.PY2:
from contextlib import contextmanager
@contextmanager
def nullcontext(enter_result=None):
yield enter_result
else:
from contextlib import nullcontext
portal_name = 'erp5_portal' portal_name = 'erp5_portal'
# we keep a reference to all sites for which setup failed the first time, to # we keep a reference to all sites for which setup failed the first time, to
...@@ -218,7 +228,7 @@ def _parse_args(self, *args, **kw): ...@@ -218,7 +228,7 @@ def _parse_args(self, *args, **kw):
_parse_args._original = DateTime._original_parse_args _parse_args._original = DateTime._original_parse_args
DateTime._parse_args = _parse_args DateTime._parse_args = _parse_args
class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase, functional.Functional):
"""Mixin class for ERP5 based tests. """Mixin class for ERP5 based tests.
""" """
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
...@@ -749,116 +759,46 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase): ...@@ -749,116 +759,46 @@ class ERP5TypeTestCaseMixin(ProcessingNodeTestCase, PortalTestCase):
ZopeTestCase._print('\n%s ' % message) ZopeTestCase._print('\n%s ' % message)
LOG('Testing ... ', DEBUG, message) LOG('Testing ... ', DEBUG, message)
def publish(self, path, basic=None, env=None, extra=None, user=None, def publish(self, path, basic=None, env=None, extra=None,
request_method='GET', stdin=None, handle_errors=True): request_method='GET', stdin=None, handle_errors=True,
'''Publishes the object at 'path' returning a response object.''' user=None):
'''Publishes the object at 'path' returning a response object.
from ZPublisher.HTTPResponse import WSGIResponse
from ZPublisher.WSGIPublisher import publish_module
from ZPublisher.httpexceptions import HTTPExceptionHandler
from Testing.ZopeTestCase.functional import ResponseWrapper
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
# Save current security manager
sm = getSecurityManager()
# Commit the sandbox for good measure
self.commit()
if env is None:
env = {}
request = self.app.REQUEST
env['SERVER_NAME'] = request['SERVER_NAME']
env['SERVER_PORT'] = request['SERVER_PORT']
env['HTTP_ACCEPT_CHARSET'] = request['HTTP_ACCEPT_CHARSET']
env['SERVER_PROTOCOL'] = 'HTTP/1.1'
env['REQUEST_METHOD'] = request_method
query = ''
if '?' in path:
path, query = path.split("?", 1)
if six.PY2:
env['PATH_INFO'] = unquote_to_bytes(path)
else:
env['PATH_INFO'] = unquote_to_bytes(path).decode('latin-1')
env['QUERY_STRING'] = query
if basic:
assert not user, (basic, user)
env['HTTP_AUTHORIZATION'] = "Basic %s" % \
base64.encodestring(basic).replace('\n', '')
elif user:
PAS = self.portal.acl_users.__class__
orig_extractUserIds = PAS._extractUserIds
from thread import get_ident
me = get_ident()
def _extractUserIds(pas, request, plugins):
if me == get_ident():
info = pas._verifyUser(plugins, user)
return [(info['id'], info['login'])] if info else ()
return orig_extractUserIds(pas, request, plugins)
if stdin is None:
stdin = BytesIO()
outstream = BytesIO()
response = WSGIResponse(stdout=outstream, stderr=sys.stderr)
try:
if user:
PAS._extractUserIds = _extractUserIds
# The following `HTTPRequest` object would be created anyway inside
# `publish_module_standard` if no `request` argument was given.
request = request.__class__(stdin, env, response)
# However, we need to inject the content of `extra` inside the
# request.
if extra:
for k, v in six.iteritems(extra): request[k] = v
wsgi_headers = BytesIO()
def start_response(status, headers):
# Keep the fake response in-sync with the actual values
# from the WSGI start_response call.
response.setStatus(status.split()[0])
for key, value in headers:
response.setHeader(key, value)
wsgi_headers.write(
b'HTTP/1.1 ' + status.encode('ascii') + b'\r\n')
headers = b'\r\n'.join([
(k + ': ' + v).encode('ascii') for k, v in headers])
wsgi_headers.write(headers)
wsgi_headers.write(b'\r\n\r\n')
publish = partial(publish_module, _request=request, _response=response)
if handle_errors:
publish = HTTPExceptionHandler(publish)
wsgi_result = publish(env, start_response)
finally:
if user:
PAS._extractUserIds = orig_extractUserIds
# Restore security manager
setSecurityManager(sm)
# Restore site removed by closing of request
setSite(self.portal)
This is from Testing.ZopeTestCase.Functional, extended to support passing
a user id directly.
'''
if user:
assert not basic
from six.moves._thread import get_ident
me = get_ident()
def _extractUserIds(pas, request, plugins):
if me == get_ident():
info = pas._verifyUser(plugins, user)
return [(info['id'], info['login'])] if info else ()
return mock.DEFAULT
user_context = mock.patch(
'Products.PluggableAuthService.PluggableAuthService.PluggableAuthService._extractUserIds',
side_effect=_extractUserIds,
autospec=True)
else:
user_context = nullcontext()
try:
with user_context:
return super(ERP5TypeTestCaseMixin, self).publish(
path,
basic=basic,
env=env,
extra=extra,
request_method=request_method,
stdin=stdin,
handle_errors=handle_errors,
)
finally:
# Make sure that the skin cache does not have objects that were # Make sure that the skin cache does not have objects that were
# loaded with the connection used by the requested url. # loaded with the connection used by the requested url.
self.changeSkin(self.portal.getCurrentSkinName()) self.changeSkin(self.portal.getCurrentSkinName())
return ResponseWrapper(response, outstream, path,
wsgi_result, wsgi_headers)
def getConsistencyMessageList(self, obj): def getConsistencyMessageList(self, obj):
return sorted([ str(message.getMessage()) return sorted([ str(message.getMessage())
for message in obj.checkConsistency() ]) for message in obj.checkConsistency() ])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment