Commit c7a4958f authored by Vincent Pelletier's avatar Vincent Pelletier

erp5_core: Improve BaseExtensibleTraversableMixin._forceIdentification

Fix bugs:
- Fix an acquisition context bug: the user found here would be wrapped in
  the acquisition context of self, and as a result SecurityManager.validate
  may consider the user to be outside of the acquisition path of the
  document being checked (ex: when accessing a module while publishing a
  web section).
- While unusual, there may be multiple users matching a given request,
  which is handled by ZPublisher but was skipped here.

Also:
Document:
- Why this method is needed.
- assumptions made to get simpler code.
Improve performance:
- portal_membership._huntUser looks the user up twice, which is expensive.
  Stop using this method.
- When the request is a fake request (from restrictedTraverse) nothing can
  nor should be done, so bypass the entire logic that case.
- Assorted tiny improvements: do not retrieve security manager twice, avoid
  extraneous local assignments, ...
Improve coding style:
- Stop accessing portal_membership's underware.
- Stop accessing PluggableAuthenticationService's underware.
- Simplify disabled cache support: this is exceedingly rare, optimise for
  when it is enabled.
- Do not hardcode log level, also increase the severity: this really is a
  warning.
- Do not try to decode Basic-auth, this is the job of the user folder.
  This removes duplicated code.
parent 76bd4921
...@@ -28,23 +28,16 @@ ...@@ -28,23 +28,16 @@
############################################################################## ##############################################################################
from warnings import warn from warnings import warn
import six
if six.PY2:
from base64 import decodestring as decodebytes
else:
from base64 import decodebytes
from zLOG import LOG from zLOG import LOG, WARNING
from AccessControl import ClassSecurityInfo, getSecurityManager from AccessControl import ClassSecurityInfo, getSecurityManager
from AccessControl.SecurityManagement import newSecurityManager from AccessControl.SecurityManagement import newSecurityManager
from Products.CMFCore.utils import getToolByName
from erp5.component.mixin.ExtensibleTraversableMixin import ExtensibleTraversableMixin from erp5.component.mixin.ExtensibleTraversableMixin import ExtensibleTraversableMixin
from Products.ERP5Type.Cache import getReadOnlyTransactionCache from Products.ERP5Type.Cache import getReadOnlyTransactionCache
from Products.ERP5Type.Globals import InitializeClass from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5Type import Permissions from Products.ERP5Type import Permissions
from Products.ERP5Type.Globals import get_request from Products.ERP5Type.Globals import get_request
from Products.ERP5Type.Utils import str2bytes, bytes2str
# XXX: these duplicate ones in ERP5.Document # XXX: these duplicate ones in ERP5.Document
_MARKER = [] _MARKER = []
...@@ -71,73 +64,74 @@ class BaseExtensibleTraversableMixin(ExtensibleTraversableMixin): ...@@ -71,73 +64,74 @@ class BaseExtensibleTraversableMixin(ExtensibleTraversableMixin):
def _forceIdentification(self, request): def _forceIdentification(self, request):
# force identification (usable for extensible content) # force identification (usable for extensible content)
# This is normally called from __bobo_traverse__ during publication.
# Publication works in two phases (for what matters here):
# - phase 1: traverse the entire path (request URL path)
# - phase 2: locate a user folder and find a user allowed to access
# published document
# This does not work for extensible traversable: here, we must look the
# document up, typically using catalog, and we need to have an
# authenticated user to do so.
# Because we do not expect to have multiple layers of user_folders (only
# one at the portal and one at zope root), and we are below both, we can
# already reliably look for the user up, authenticating the request and
# setting up a reasonable security context to use in catalog lookup
# (or executing other restricted code).
# XXX: this is certainly not the most elegant way of doing so.
old_manager = getSecurityManager()
cache = getReadOnlyTransactionCache() cache = getReadOnlyTransactionCache()
if cache is not None: if cache is None:
cache = {}
key = ('__bobo_traverse__', self, 'user') key = ('__bobo_traverse__', self, 'user')
try: try:
user = cache[key] user = cache[key]
except KeyError: except KeyError:
user = _MARKER old_user = old_manager.getUser()
else:
user = _MARKER
old_user = getSecurityManager().getUser()
if user is _MARKER:
user = None # By default, do nothing user = None # By default, do nothing
if old_user is None or old_user.getUserName() == 'Anonymous User': if old_user is None or old_user.getUserName() == 'Anonymous User':
portal_membership = getToolByName(self.getPortalObject(),
'portal_membership')
if portal_membership is not None:
try:
if request.get('PUBLISHED', _MARKER) is _MARKER:
# request['PUBLISHED'] is required by validate
request['PUBLISHED'] = self
has_published = False
else:
has_published = True
try: try:
auth = request._auth auth = request._auth
except AttributeError: except AttributeError:
# This kind of error happens with unrestrictedTraverse, # This kind of error happens with unrestrictedTraverse,
# because the request object is a fake, and it is just # because the request object is a fake, and it is just
# a dict object. # a dict object. Nothing can be done with such an object.
user = None user = None
else: else:
name = None need_published = 'PUBLISHED' not in request.other
acl_users = self.getPortalObject().acl_users
user_list = acl_users._extractUserIds(request, acl_users.plugins)
if len(user_list) > 0:
name = user_list[0][0]
else:
# this logic is copied from identify() in
# AccessControl.User.BasicUserFolder.
if auth and auth.lower().startswith('basic '):
name = bytes2str(decodebytes(str2bytes(auth.split(' ')[-1]))).split(':', 1)[0]
if name is not None:
user = portal_membership._huntUser(name, self)
else:
user = None
if not has_published:
try: try:
del request.other['PUBLISHED'] if need_published:
except AttributeError: # request['PUBLISHED'] is required by validate
# The same here as above. unrestrictedTraverse provides request['PUBLISHED'] = self
# just a plain dict, so request.other does not exist. portal = self.getPortalObject()
del request['PUBLISHED'] # XXX: this is a simplification of the user validation logic from ZPublisher
# - only two specific locations are checked for user folder existence
# - user folder name is hardcoded (instead of going though __allow_groups__)
# - no request.role handling
for user_folder_parent in (
portal,
portal.aq_parent,
):
user = user_folder_parent.acl_users.validate(request, auth)
if user is not None:
if user.getUserName() == 'Anonymous User':
# If the user which is connected is anonymous, do not try to change SecurityManager
user = None
continue
break
except Exception: except Exception:
LOG("ERP5 WARNING",0, LOG("ERP5 WARNING", WARNING,
"Failed to retrieve user in __bobo_traverse__ of WebSection %s" % self.getPath(), "Failed to retrieve user in __bobo_traverse__ of WebSection %s" % self.getPath(),
error=True) error=True)
user = None user = None
if user is not None and user.getUserName() == 'Anonymous User': finally:
user = None # If the user which is connected is anonymous, if need_published:
# do not try to change SecurityManager del request.other['PUBLISHED']
if cache is not None:
cache[key] = user cache[key] = user
if user is None:
old_manager = None old_manager = None
if user is not None: else:
# We need to perform identification # We need to perform identification
old_manager = getSecurityManager()
newSecurityManager(get_request(), user) newSecurityManager(get_request(), user)
return old_manager, user return old_manager, user
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment