Commit e64c2b87 authored by Romain Courteaud's avatar Romain Courteaud

Allow other roles than Owner to be catalogued in a monovalued column.

Monovalued column only index user ID, and no security group.
If some security groups are assigned to such role, they are still catalogued in
the roles_and_users table.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@25449 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent a5499bba
No related merge requests found
......@@ -103,17 +103,9 @@ class IndexableObjectWrapper(CMFCoreIndexableObjectWrapper):
else:
self.__dict__[name] = value
def allowedRolesAndUsers(self):
"""
Return a list of roles and users with View permission.
Used by Portal Catalog to filter out items you're not allowed to see.
WARNING (XXX): some user base local role association is currently
being stored (ex. to be determined). This should be prevented or it will
make the table explode. To analyse the symptoms, look at the
user_and_roles table. You will find some user:foo values
which are not necessary.
"""
def _getSecurityParameterList(self):
result_key = '_cache_result'
if result_key not in self.__dict__:
ob = self.__ob
security_product = getSecurityProduct(ob.acl_users)
withnuxgroups = security_product == SECURITY_USING_NUX_USER_GROUPS
......@@ -147,6 +139,12 @@ class IndexableObjectWrapper(CMFCoreIndexableObjectWrapper):
if len(new_role_list)>0:
flat_localroles[key] = new_role_list
localroles = flat_localroles
portal = self.getPortalObject()
role_dict = dict(portal.portal_catalog.getSQLCatalog().\
getSQLCatalogRoleKeysList())
getUserById = portal.acl_users.getUserById
# For each local role of a user:
# If the local role grants View permission, add it.
# Every addition implies 2 lines:
......@@ -154,18 +152,72 @@ class IndexableObjectWrapper(CMFCoreIndexableObjectWrapper):
# user:<user_id>:<role_id>
# A line must not be present twice in final result.
allowed = sets.Set(rolesForPermissionOn('View', ob))
# XXX Owner is hardcoded, in order to prevent searching for user on the
# site root.
allowed.discard('Owner')
add = allowed.add
user_role_dict = {}
user_view_permission_role_dict = {}
for user, roles in localroles.iteritems():
if withnuxgroups:
prefix = user
else:
prefix = 'user:' + user
for role in roles:
if (role in role_dict) and (getUserById(user) is not None):
# If role is monovalued, check if key is a user.
# If not, continue to index it in roles_and_users table.
user_role_dict[role] = user
if role in allowed:
user_view_permission_role_dict[role] = user
elif role in allowed:
add(prefix)
add(prefix + ':' + role)
return list(allowed)
# __setattr__ explicitely set the parameter on the wrapper
setattr(self, result_key,
(list(allowed), user_role_dict,
user_view_permission_role_dict))
# Return expected value
return self.__dict__[result_key]
def allowedRolesAndUsers(self):
"""
Return a list of roles and users with View permission.
Used by Portal Catalog to filter out items you're not allowed to see.
WARNING (XXX): some user base local role association is currently
being stored (ex. to be determined). This should be prevented or it will
make the table explode. To analyse the symptoms, look at the
user_and_roles table. You will find some user:foo values
which are not necessary.
"""
return self._getSecurityParameterList()[0]
def getAssignee(self):
"""Returns the user ID of the user with 'Assignee' local role on this
document.
If there is more than one Assignee local role, the result is undefined.
"""
return self._getSecurityParameterList()[1].get('Assignee', None)
def getViewPermissionAssignee(self):
"""Returns the user ID of the user with 'Assignee' local role on this
document, if the Assignee role has View permission.
If there is more than one Assignee local role, the result is undefined.
"""
return self._getSecurityParameterList()[2].get('Assignee', None)
def getViewPermissionAssignor(self):
"""Returns the user ID of the user with 'Assignor' local role on this
document, if the Assignor role has View permission.
If there is more than one Assignor local role, the result is undefined.
"""
return self._getSecurityParameterList()[2].get('Assignor', None)
def __repr__(self):
return '<Products.ERP5Catalog.CatalogTool.IndexableObjectWrapper'\
......@@ -467,16 +519,31 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
catalog = self.getSQLCatalog(sql_catalog_id)
column_map = catalog.getColumnMap()
# We only consider here the Owner role (since it was not indexed)
# since some objects may only be visible by their owner
# which was not indexed
for role, column_id in catalog.getSQLCatalogRoleKeysList():
# XXX This should be a list
if not user_is_superuser:
try:
# if called by an executable with proxy roles, we don't use
# owner, but only roles from the proxy.
eo = getSecurityManager()._context.stack[-1]
proxy_roles = getattr(eo, '_proxy_roles', None)
if not proxy_roles:
role_column_dict[column_id] = user_str
except IndexError:
role_column_dict[column_id] = user_str
# Patch for ERP5 by JP Smets in order
# to implement worklists and search of local roles
if kw.has_key('local_roles'):
local_roles = kw['local_roles']
local_roles = kw.get('local_roles', None)
if local_roles:
local_role_dict = dict(catalog.getSQLCatalogLocalRoleKeysList())
role_dict = dict(catalog.getSQLCatalogRoleKeysList())
# XXX user is not enough - we should also include groups of the user
# Only consider local_roles if it is not empty
if local_roles not in (None, '', []): # XXX: Maybe "if local_roles:" is enough.
new_allowedRolesAndUsers = []
new_role_column_dict = {}
# Turn it into a list if necessary according to ';' separator
if isinstance(local_roles, str):
local_roles = local_roles.split(';')
......@@ -503,29 +570,12 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
# XXX This should be a list
# which also includes all user groups
column_id = role_dict[role]
role_column_dict[column_id] = user_str
else:
# Else, we use the standard approach
new_role_column_dict[column_id] = user_str
new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role))
if local_role_column_dict == {}:
allowedRolesAndUsers = new_allowedRolesAndUsers
role_column_dict = new_role_column_dict
else:
# We only consider here the Owner role (since it was not indexed)
# since some objects may only be visible by their owner
# which was not indexed
for role, column_id in catalog.getSQLCatalogRoleKeysList():
# XXX This should be a list
if not user_is_superuser:
try:
# if called by an executable with proxy roles, we don't use
# owner, but only roles from the proxy.
eo = getSecurityManager()._context.stack[-1]
proxy_roles = getattr(eo, '_proxy_roles', None)
if not proxy_roles:
role_column_dict[column_id] = user_str
except IndexError:
role_column_dict[column_id] = user_str
return allowedRolesAndUsers, role_column_dict, local_role_column_dict
......
......@@ -2287,6 +2287,17 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
portal = self.getPortalObject()
portal_types = portal.portal_types
uf = self.getPortal().acl_users
uf._doAddUser('foo', 'foo', ['Member', ], [])
uf._doAddUser('ERP5TypeTestCase', 'ERP5TypeTestCase', ['Member', ], [])
get_transaction().commit()
portal_catalog = self.getCatalogTool()
portal_catalog.manage_catalogClear()
self.getPortal().ERP5Site_reindexAll()
get_transaction().commit()
self.tic()
# Person stuff
person_module = portal.person_module
person = 'Person'
......@@ -2592,6 +2603,710 @@ VALUES
sql_catalog.sql_search_tables = current_sql_search_tables
get_transaction().commit()
def test_MonoValueAssigneeIndexing(self, quiet=quiet, run=run_all_test):
if not run:
return
login = PortalTestCase.login
logout = self.logout
user1 = 'local_foo'
user2 = 'local_bar'
uf = self.getPortal().acl_users
uf._doAddUser(user1, user1, ['Member', ], [])
uf._doAddUser(user2, user2, ['Member', ], [])
perm = 'View'
folder = self.getOrganisationModule()
folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
folder.manage_setLocalRoles(user2, ['Author', 'Auditor'])
portal_type = 'Organisation'
sql_connection = self.getSQLConnection()
login(self, user2)
obj2 = folder.newContent(portal_type=portal_type)
obj2.manage_setLocalRoles(user1, ['Auditor'])
obj2.manage_permission(perm, ['Assignee', 'Auditor'], 0)
login(self, user1)
obj = folder.newContent(portal_type=portal_type)
obj.manage_setLocalRoles(user1, ['Assignee', 'Auditor'])
# Check that nothing is returned when user can not view the object
obj.manage_permission(perm, [], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, ], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
self.assertSameSet([], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Auditor'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
self.assertSameSet([], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Assignee'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
self.assertSameSet([obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
# Add a new table to the catalog
sql_catalog = self.portal.portal_catalog.getSQLCatalog()
local_roles_table = "test_assignee_local_roles"
create_local_role_table_sql = """
CREATE TABLE `%s` (
`uid` BIGINT UNSIGNED NOT NULL,
`assignee_reference` varchar(32) NOT NULL default '',
`viewable_assignee_reference` varchar(32) NOT NULL default '',
PRIMARY KEY (`uid`),
KEY `assignee_reference` (`assignee_reference`),
KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
) TYPE=InnoDB;
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_create_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = create_local_role_table_sql)
drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z0_drop_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = drop_local_role_table_sql)
catalog_local_role_sql = """
REPLACE INTO
%s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
<dtml-sqlvar expr="uid[loop_item]" type="int">,
<dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>,
<dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_catalog_%s_list' % local_roles_table,
title = '',
connection_id = 'erp5_sql_connection',
arguments = "\n".join(['uid',
'getAssignee',
'getViewPermissionAssignee']),
template = catalog_local_role_sql)
get_transaction().commit()
current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list + \
('z_catalog_%s_list' % local_roles_table,)
current_sql_clear_catalog = sql_catalog.sql_clear_catalog
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog + \
('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
current_sql_catalog_local_role_keys = \
sql_catalog.sql_catalog_local_role_keys
sql_catalog.sql_catalog_local_role_keys = ('Assignee | %s.assignee_reference' % \
local_roles_table,)
current_sql_catalog_role_keys = \
sql_catalog.sql_catalog_role_keys
sql_catalog.sql_catalog_role_keys = (
'Assignee | %s.viewable_assignee_reference' % \
local_roles_table,)
current_sql_search_tables = sql_catalog.sql_search_tables
sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
[local_roles_table]
get_transaction().commit()
try:
# Clear catalog
portal_catalog = self.getCatalogTool()
portal_catalog.manage_catalogClear()
get_transaction().commit()
self.portal.portal_caches.clearAllCache()
get_transaction().commit()
obj2.reindexObject()
# Check that nothing is returned when user can not view the object
obj.manage_permission(perm, [], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, ], [x.getObject() for x in result])
method = obj.portal_catalog
result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
self.assertSameSet([], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Auditor'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
self.assertSameSet([obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Assignee'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee')
self.assertSameSet([obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
finally:
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog
sql_catalog.sql_catalog_local_role_keys = \
current_sql_catalog_local_role_keys
sql_catalog.sql_catalog_role_keys = \
current_sql_catalog_role_keys
sql_catalog.sql_search_tables = current_sql_search_tables
get_transaction().commit()
def test_UserOrGroupRoleIndexing(self, quiet=quiet, run=run_all_test):
if not run:
return
login = PortalTestCase.login
logout = self.logout
user1 = 'a_great_user_name'
user1_group = 'a_great_user_group'
uf = self.getPortal().acl_users
uf._doAddUser(user1, user1, ['Member', ], [])
uf.zodb_groups.addGroup( user1_group, user1_group, user1_group)
new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group )
perm = 'View'
folder = self.getOrganisationModule()
folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
portal_type = 'Organisation'
organisation = folder.newContent(portal_type=portal_type)
sql_connection = self.getSQLConnection()
def query(sql):
result = sql_connection.manage_test(sql)
return result.dictionaries()
login(self, user1)
# Add a new table to the catalog
sql_catalog = self.portal.portal_catalog.getSQLCatalog()
local_roles_table = "test_user_or_group_local_roles"
create_local_role_table_sql = """
CREATE TABLE `%s` (
`uid` BIGINT UNSIGNED NOT NULL,
`assignee_reference` varchar(32) NOT NULL default '',
`viewable_assignee_reference` varchar(32) NOT NULL default '',
PRIMARY KEY (`uid`),
KEY `assignee_reference` (`assignee_reference`),
KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
) TYPE=InnoDB;
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_create_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = create_local_role_table_sql)
drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z0_drop_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = drop_local_role_table_sql)
catalog_local_role_sql = """
REPLACE INTO
%s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
<dtml-sqlvar expr="uid[loop_item]" type="int">,
<dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>,
<dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_catalog_%s_list' % local_roles_table,
title = '',
connection_id = 'erp5_sql_connection',
arguments = "\n".join(['uid',
'getAssignee',
'getViewPermissionAssignee']),
template = catalog_local_role_sql)
get_transaction().commit()
current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list + \
('z_catalog_%s_list' % local_roles_table,)
current_sql_clear_catalog = sql_catalog.sql_clear_catalog
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog + \
('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
current_sql_catalog_local_role_keys = \
sql_catalog.sql_catalog_local_role_keys
sql_catalog.sql_catalog_local_role_keys = (
'Owner | viewable_owner',
'Assignee | %s.assignee_reference' % \
local_roles_table,)
current_sql_catalog_role_keys = \
sql_catalog.sql_catalog_role_keys
sql_catalog.sql_catalog_role_keys = (
'Assignee | %s.viewable_assignee_reference' % \
local_roles_table,)
current_sql_search_tables = sql_catalog.sql_search_tables
sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
[local_roles_table]
portal = self.getPortal()
get_transaction().commit()
try:
# Clear catalog
portal_catalog = self.getCatalogTool()
portal_catalog.manage_catalogClear()
get_transaction().commit()
self.portal.portal_caches.clearAllCache()
get_transaction().commit()
organisation_relative_url = organisation.getRelativeUrl()
countResults = organisation.portal_catalog.countResults
count_result_kw = {'relative_url': organisation_relative_url}
use_case_number = 0
for view_permission_role_list, security_group_list, \
global_view, associate_view, assignee_view, both_view in \
[
# No view permission
([], [], 0, 0, 0, 0),
([], [(user1, ['Associate'])], 0, 0, 0, 0),
([], [(user1, ['Assignee'])], 0, 0, 0, 0),
([], [(user1, ['Assignee', 'Associate'])], 0, 0, 0, 0),
([], [(user1_group, ['Assignee'])], 0, 0, 0, 0),
([], [(user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0),
([], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 0, 0, 0, 0),
([], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0),
# View permission for Assignee
(['Assignee'], [], 0, 0, 0, 0),
(['Assignee'], [(user1, ['Associate'])], 0, 0, 0, 0),
(['Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1),
(['Assignee'], [(user1, ['Assignee', 'Associate'])], 1, 0, 1, 1),
(['Assignee'], [(user1_group, ['Assignee'])], 1, 0, 0, 0),
(['Assignee'], [(user1_group, ['Assignee', 'Associate'])],
1, 0, 0, 0),
(['Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 1, 0, 1, 1),
(['Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])],
1, 0, 1, 1),
# View permission for Associate
(['Associate'], [], 0, 0, 0, 0),
(['Associate'], [(user1, ['Associate'])], 1, 1, 0, 0),
(['Associate'], [(user1, ['Assignee'])], 0, 0, 0, 0),
(['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1),
(['Associate'], [(user1_group, ['Assignee'])], 0, 0, 0, 0),
(['Associate'], [(user1_group, ['Assignee', 'Associate'])],
1, 1, 0, 0),
(['Associate'], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 0, 0, 0, 0),
(['Associate'], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])],
1, 1, 1, 1),
# View permission for Associate and Assignee
(['Associate', 'Assignee'], [], 0, 0, 0, 0),
(['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 1, 0, 0),
(['Associate', 'Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1),
(['Associate', 'Assignee'],
[(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1),
(['Associate', 'Assignee'],
[(user1_group, ['Assignee'])], 1, 0, 0, 0),
(['Associate', 'Assignee'],
[(user1_group, ['Assignee', 'Associate'])], 1, 1, 0, 0),
(['Associate', 'Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 1, 0, 1, 1),
(['Associate', 'Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])],
1, 1, 1, 1),
]:
use_case_number += 1
organisation.manage_permission(perm, view_permission_role_list, 0)
organisation.manage_delLocalRoles([user1, user1_group])
for security_group, local_role_list in security_group_list:
organisation.manage_setLocalRoles(security_group, local_role_list)
organisation.reindexObject()
get_transaction().commit()
self.tic()
for expected_result, local_roles in \
[
(global_view, None),
(associate_view, 'Associate'),
(assignee_view, 'Assignee'),
(both_view, ['Associate', 'Assignee']),
]:
object_security_uid = query(
'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \
organisation_relative_url
)[0]['security_uid']
if object_security_uid is not None:
roles_and_users = query(
'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \
object_security_uid
)
else:
roles_and_users = ''
monovalue_references = query(
'SELECT * FROM test_user_or_group_local_roles WHERE uid="%s"' % \
organisation.getUid())[0]
assignee_reference = monovalue_references['assignee_reference']
viewable_assignee_reference = \
monovalue_references['viewable_assignee_reference']
result = countResults(local_roles=local_roles, **count_result_kw)[0][0]
if result != expected_result:
countResults(local_roles=local_roles, src__=1,
**count_result_kw)
self.fail('Use case %s\n\tView permission is given to: %s\n\t' \
'Local roles are: %s\n\t' \
'local_roles parameter is: %s\n\t' \
'Object IS %s returned by portal_catalog!\n\t' \
'\n\tSecurity uid is: %s\n\t'
'Roles and users: %s\n\t'
'assignee_reference: %s\n\t'
'viewable_assignee_reference: %s\n\t'
'\n\tSQL generated: \n\n%s' \
'' % \
(use_case_number,
view_permission_role_list,
organisation.__ac_local_roles__,
local_roles, ['NOT', ''][result],
object_security_uid,
str([x['allowedRolesAndUsers'] for x in roles_and_users]),
assignee_reference,
viewable_assignee_reference,
countResults(local_roles=local_roles, src__=1,
**count_result_kw)))
finally:
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog
sql_catalog.sql_catalog_local_role_keys = \
current_sql_catalog_local_role_keys
sql_catalog.sql_catalog_role_keys = \
current_sql_catalog_role_keys
sql_catalog.sql_search_tables = current_sql_search_tables
get_transaction().commit()
def test_UserOrGroupLocalRoleIndexing(self, quiet=quiet, run=run_all_test):
if not run:
return
login = PortalTestCase.login
logout = self.logout
user1 = 'another_great_user_name'
user1_group = 'another_great_user_group'
uf = self.getPortal().acl_users
uf._doAddUser(user1, user1, ['Member', ], [])
uf.zodb_groups.addGroup( user1_group, user1_group, user1_group)
new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group )
perm = 'View'
folder = self.getOrganisationModule()
folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
portal_type = 'Organisation'
organisation = folder.newContent(portal_type=portal_type)
sql_connection = self.getSQLConnection()
def query(sql):
result = sql_connection.manage_test(sql)
return result.dictionaries()
login(self, user1)
# Add a new table to the catalog
sql_catalog = self.portal.portal_catalog.getSQLCatalog()
local_roles_table = "another_test_user_or_group_local_roles"
create_local_role_table_sql = """
CREATE TABLE `%s` (
`uid` BIGINT UNSIGNED NOT NULL,
`viewable_assignee_reference` varchar(32) NOT NULL default '',
PRIMARY KEY (`uid`),
KEY `viewable_assignee_reference` (`viewable_assignee_reference`)
) TYPE=InnoDB;
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_create_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = create_local_role_table_sql)
drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z0_drop_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = drop_local_role_table_sql)
catalog_local_role_sql = """
REPLACE INTO
%s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
<dtml-sqlvar expr="uid[loop_item]" type="int">,
<dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_catalog_%s_list' % local_roles_table,
title = '',
connection_id = 'erp5_sql_connection',
arguments = "\n".join(['uid',
'getViewPermissionAssignee']),
template = catalog_local_role_sql)
get_transaction().commit()
current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list + \
('z_catalog_%s_list' % local_roles_table,)
current_sql_clear_catalog = sql_catalog.sql_clear_catalog
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog + \
('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
current_sql_catalog_role_keys = \
sql_catalog.sql_catalog_role_keys
sql_catalog.sql_catalog_role_keys = (
'Owner | viewable_owner',
'Assignee | %s.viewable_assignee_reference' % \
local_roles_table,)
current_sql_search_tables = sql_catalog.sql_search_tables
sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
[local_roles_table]
portal = self.getPortal()
get_transaction().commit()
try:
# Clear catalog
portal_catalog = self.getCatalogTool()
portal_catalog.manage_catalogClear()
get_transaction().commit()
self.portal.portal_caches.clearAllCache()
get_transaction().commit()
organisation_relative_url = organisation.getRelativeUrl()
countResults = organisation.portal_catalog.countResults
count_result_kw = {'relative_url': organisation_relative_url}
use_case_number = 0
for view_permission_role_list, security_group_list, \
associate_view, assignee_view in \
[
# No view permission
([], [], 0, 0),
([], [(user1, ['Associate'])], 0, 0),
([], [(user1, ['Assignee'])], 0, 0),
([], [(user1, ['Assignee', 'Associate'])], 0, 0),
([], [(user1_group, ['Assignee'])], 0, 0),
([], [(user1_group, ['Assignee', 'Associate'])], 0, 0),
([], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 0, 0),
([], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])], 0, 0),
# View permission for Assignee
(['Assignee'], [], 0, 0),
(['Assignee'], [(user1, ['Associate'])], 0, 0),
(['Assignee'], [(user1, ['Assignee'])], 0, 1),
(['Assignee'], [(user1, ['Assignee', 'Associate'])], 0, 1),
(['Assignee'], [(user1_group, ['Assignee'])], 0, 1),
(['Assignee'], [(user1_group, ['Assignee', 'Associate'])], 0, 1),
(['Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 0, 1),
(['Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])], 0, 1),
# View permission for Associate
(['Associate'], [], 0, 0),
(['Associate'], [(user1, ['Associate'])], 1, 0),
(['Associate'], [(user1, ['Assignee'])], 0, 0),
(['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 0),
(['Associate'], [(user1_group, ['Assignee'])], 0, 0),
(['Associate'], [(user1_group, ['Assignee', 'Associate'])], 1, 0),
(['Associate'], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 0, 0),
(['Associate'], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])], 1, 0),
# View permission for Associate and Assignee
(['Associate', 'Assignee'], [], 0, 0),
(['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 0),
(['Associate', 'Assignee'], [(user1, ['Assignee'])], 0, 1),
(['Associate', 'Assignee'],
[(user1, ['Assignee', 'Associate'])], 1, 1),
(['Associate', 'Assignee'],
[(user1_group, ['Assignee'])], 0, 1),
(['Associate', 'Assignee'],
[(user1_group, ['Assignee', 'Associate'])], 1, 1),
(['Associate', 'Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee'])], 0, 1),
(['Associate', 'Assignee'], [(user1, ['Assignee']),
(user1_group, ['Assignee', 'Associate'])], 1, 1),
]:
use_case_number += 1
organisation.manage_permission(perm, view_permission_role_list, 0)
organisation.manage_delLocalRoles([user1, user1_group])
for security_group, local_role_list in security_group_list:
organisation.manage_setLocalRoles(security_group, local_role_list)
organisation.reindexObject()
get_transaction().commit()
self.tic()
for expected_result, local_roles in \
[
(associate_view or assignee_view, None),
(associate_view, 'Associate'),
(assignee_view, 'Assignee'),
(associate_view or assignee_view, ['Associate', 'Assignee']),
]:
object_security_uid = query(
'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \
organisation_relative_url
)[0]['security_uid']
if object_security_uid is not None:
roles_and_users = query(
'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \
object_security_uid
)
else:
roles_and_users = ''
monovalue_references = query(
'SELECT * FROM %s WHERE uid="%s"' % \
(local_roles_table, organisation.getUid()))[0]
viewable_assignee_reference = \
monovalue_references['viewable_assignee_reference']
result = countResults(local_roles=local_roles, **count_result_kw)[0][0]
if result != expected_result:
countResults(local_roles=local_roles, src__=1,
**count_result_kw)
self.fail('Use case %s\n\tView permission is given to: %s\n\t' \
'Local roles are: %s\n\t' \
'local_roles parameter is: %s\n\t' \
'Object IS %s returned by portal_catalog!\n\t' \
'\n\tSecurity uid is: %s\n\t'
'Roles and users: %s\n\t'
'viewable_assignee_reference: %s\n\t'
'\n\tSQL generated: \n\n%s' \
'' % \
(use_case_number,
view_permission_role_list,
organisation.__ac_local_roles__,
local_roles, ['NOT', ''][result],
object_security_uid,
str([x['allowedRolesAndUsers'] for x in roles_and_users]),
viewable_assignee_reference,
countResults(local_roles=local_roles, src__=1,
**count_result_kw)))
finally:
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog
sql_catalog.sql_catalog_role_keys = \
current_sql_catalog_role_keys
sql_catalog.sql_search_tables = current_sql_search_tables
get_transaction().commit()
def test_ObjectReindexationConcurency(self, quiet=quiet, run=run_all_test):
if not run:
return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment