From e64c2b87d6ba06b4d17ca9de26234153b5531ebb Mon Sep 17 00:00:00 2001 From: Romain Courteaud <romain@nexedi.com> Date: Wed, 4 Feb 2009 16:47:14 +0000 Subject: [PATCH] Allow other roles than Owner to be catalogued in a monovalued column. Monovalued column only index user ID, and no security group. If some security groups are assigned to such role, they are still catalogued in the roles_and_users table. git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@25449 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/ERP5Catalog/CatalogTool.py | 178 +++-- product/ERP5Catalog/tests/testERP5Catalog.py | 715 +++++++++++++++++++ 2 files changed, 829 insertions(+), 64 deletions(-) diff --git a/product/ERP5Catalog/CatalogTool.py b/product/ERP5Catalog/CatalogTool.py index b4c9b2a6e6..9defb33276 100644 --- a/product/ERP5Catalog/CatalogTool.py +++ b/product/ERP5Catalog/CatalogTool.py @@ -103,17 +103,9 @@ class IndexableObjectWrapper(CMFCoreIndexableObjectWrapper): else: self.__dict__[name] = value - def allowedRolesAndUsers(self): - """ - Return a list of roles and users with View permission. - Used by Portal Catalog to filter out items you're not allowed to see. - - WARNING (XXX): some user base local role association is currently - being stored (ex. to be determined). This should be prevented or it will - make the table explode. To analyse the symptoms, look at the - user_and_roles table. You will find some user:foo values - which are not necessary. - """ + def _getSecurityParameterList(self): + result_key = '_cache_result' + if result_key not in self.__dict__: ob = self.__ob security_product = getSecurityProduct(ob.acl_users) withnuxgroups = security_product == SECURITY_USING_NUX_USER_GROUPS @@ -147,6 +139,12 @@ class IndexableObjectWrapper(CMFCoreIndexableObjectWrapper): if len(new_role_list)>0: flat_localroles[key] = new_role_list localroles = flat_localroles + + portal = self.getPortalObject() + role_dict = dict(portal.portal_catalog.getSQLCatalog().\ + getSQLCatalogRoleKeysList()) + getUserById = portal.acl_users.getUserById + # For each local role of a user: # If the local role grants View permission, add it. # Every addition implies 2 lines: @@ -154,18 +152,72 @@ class IndexableObjectWrapper(CMFCoreIndexableObjectWrapper): # user:<user_id>:<role_id> # A line must not be present twice in final result. allowed = sets.Set(rolesForPermissionOn('View', ob)) + # XXX Owner is hardcoded, in order to prevent searching for user on the + # site root. allowed.discard('Owner') add = allowed.add + user_role_dict = {} + user_view_permission_role_dict = {} for user, roles in localroles.iteritems(): if withnuxgroups: prefix = user else: prefix = 'user:' + user for role in roles: - if role in allowed: + if (role in role_dict) and (getUserById(user) is not None): + # If role is monovalued, check if key is a user. + # If not, continue to index it in roles_and_users table. + user_role_dict[role] = user + if role in allowed: + user_view_permission_role_dict[role] = user + elif role in allowed: add(prefix) add(prefix + ':' + role) - return list(allowed) + + # __setattr__ explicitely set the parameter on the wrapper + setattr(self, result_key, + (list(allowed), user_role_dict, + user_view_permission_role_dict)) + + # Return expected value + return self.__dict__[result_key] + + def allowedRolesAndUsers(self): + """ + Return a list of roles and users with View permission. + Used by Portal Catalog to filter out items you're not allowed to see. + + WARNING (XXX): some user base local role association is currently + being stored (ex. to be determined). This should be prevented or it will + make the table explode. To analyse the symptoms, look at the + user_and_roles table. You will find some user:foo values + which are not necessary. + """ + return self._getSecurityParameterList()[0] + + def getAssignee(self): + """Returns the user ID of the user with 'Assignee' local role on this + document. + + If there is more than one Assignee local role, the result is undefined. + """ + return self._getSecurityParameterList()[1].get('Assignee', None) + + def getViewPermissionAssignee(self): + """Returns the user ID of the user with 'Assignee' local role on this + document, if the Assignee role has View permission. + + If there is more than one Assignee local role, the result is undefined. + """ + return self._getSecurityParameterList()[2].get('Assignee', None) + + def getViewPermissionAssignor(self): + """Returns the user ID of the user with 'Assignor' local role on this + document, if the Assignor role has View permission. + + If there is more than one Assignor local role, the result is undefined. + """ + return self._getSecurityParameterList()[2].get('Assignor', None) def __repr__(self): return '<Products.ERP5Catalog.CatalogTool.IndexableObjectWrapper'\ @@ -467,65 +519,63 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject): catalog = self.getSQLCatalog(sql_catalog_id) column_map = catalog.getColumnMap() + # We only consider here the Owner role (since it was not indexed) + # since some objects may only be visible by their owner + # which was not indexed + for role, column_id in catalog.getSQLCatalogRoleKeysList(): + # XXX This should be a list + if not user_is_superuser: + try: + # if called by an executable with proxy roles, we don't use + # owner, but only roles from the proxy. + eo = getSecurityManager()._context.stack[-1] + proxy_roles = getattr(eo, '_proxy_roles', None) + if not proxy_roles: + role_column_dict[column_id] = user_str + except IndexError: + role_column_dict[column_id] = user_str + # Patch for ERP5 by JP Smets in order # to implement worklists and search of local roles - if kw.has_key('local_roles'): - local_roles = kw['local_roles'] + local_roles = kw.get('local_roles', None) + if local_roles: local_role_dict = dict(catalog.getSQLCatalogLocalRoleKeysList()) role_dict = dict(catalog.getSQLCatalogRoleKeysList()) # XXX user is not enough - we should also include groups of the user - # Only consider local_roles if it is not empty - if local_roles not in (None, '', []): # XXX: Maybe "if local_roles:" is enough. - new_allowedRolesAndUsers = [] - # Turn it into a list if necessary according to ';' separator - if isinstance(local_roles, str): - local_roles = local_roles.split(';') - # Local roles now has precedence (since it comes from a WorkList) - for user_or_group in allowedRolesAndUsers: - for role in local_roles: - # Performance optimisation - if local_role_dict.has_key(role): + new_allowedRolesAndUsers = [] + new_role_column_dict = {} + # Turn it into a list if necessary according to ';' separator + if isinstance(local_roles, str): + local_roles = local_roles.split(';') + # Local roles now has precedence (since it comes from a WorkList) + for user_or_group in allowedRolesAndUsers: + for role in local_roles: + # Performance optimisation + if local_role_dict.has_key(role): + # XXX This should be a list + # If a given role exists as a column in the catalog, + # then it is considered as single valued and indexed + # through the catalog. + if not user_is_superuser: # XXX This should be a list - # If a given role exists as a column in the catalog, - # then it is considered as single valued and indexed - # through the catalog. - if not user_is_superuser: - # XXX This should be a list - # which also includes all user groups - column_id = local_role_dict[role] - local_role_column_dict[column_id] = user_str - if role_dict.has_key(role): + # which also includes all user groups + column_id = local_role_dict[role] + local_role_column_dict[column_id] = user_str + if role_dict.has_key(role): + # XXX This should be a list + # If a given role exists as a column in the catalog, + # then it is considered as single valued and indexed + # through the catalog. + if not user_is_superuser: # XXX This should be a list - # If a given role exists as a column in the catalog, - # then it is considered as single valued and indexed - # through the catalog. - if not user_is_superuser: - # XXX This should be a list - # which also includes all user groups - column_id = role_dict[role] - role_column_dict[column_id] = user_str - else: - # Else, we use the standard approach - new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role)) - if local_role_column_dict == {}: - allowedRolesAndUsers = new_allowedRolesAndUsers + # which also includes all user groups + column_id = role_dict[role] + new_role_column_dict[column_id] = user_str + new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role)) + if local_role_column_dict == {}: + allowedRolesAndUsers = new_allowedRolesAndUsers + role_column_dict = new_role_column_dict - else: - # We only consider here the Owner role (since it was not indexed) - # since some objects may only be visible by their owner - # which was not indexed - for role, column_id in catalog.getSQLCatalogRoleKeysList(): - # XXX This should be a list - if not user_is_superuser: - try: - # if called by an executable with proxy roles, we don't use - # owner, but only roles from the proxy. - eo = getSecurityManager()._context.stack[-1] - proxy_roles = getattr(eo, '_proxy_roles', None) - if not proxy_roles: - role_column_dict[column_id] = user_str - except IndexError: - role_column_dict[column_id] = user_str return allowedRolesAndUsers, role_column_dict, local_role_column_dict diff --git a/product/ERP5Catalog/tests/testERP5Catalog.py b/product/ERP5Catalog/tests/testERP5Catalog.py index e79abef2bf..7793f7cf26 100644 --- a/product/ERP5Catalog/tests/testERP5Catalog.py +++ b/product/ERP5Catalog/tests/testERP5Catalog.py @@ -2286,6 +2286,17 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): sql_connection = self.getSQLConnection() portal = self.getPortalObject() portal_types = portal.portal_types + + uf = self.getPortal().acl_users + uf._doAddUser('foo', 'foo', ['Member', ], []) + uf._doAddUser('ERP5TypeTestCase', 'ERP5TypeTestCase', ['Member', ], []) + get_transaction().commit() + portal_catalog = self.getCatalogTool() + portal_catalog.manage_catalogClear() + self.getPortal().ERP5Site_reindexAll() + get_transaction().commit() + self.tic() + # Person stuff person_module = portal.person_module @@ -2592,6 +2603,710 @@ VALUES sql_catalog.sql_search_tables = current_sql_search_tables get_transaction().commit() + def test_MonoValueAssigneeIndexing(self, quiet=quiet, run=run_all_test): + if not run: + return + + login = PortalTestCase.login + logout = self.logout + user1 = 'local_foo' + user2 = 'local_bar' + uf = self.getPortal().acl_users + uf._doAddUser(user1, user1, ['Member', ], []) + uf._doAddUser(user2, user2, ['Member', ], []) + + perm = 'View' + folder = self.getOrganisationModule() + folder.manage_setLocalRoles(user1, ['Author', 'Auditor']) + folder.manage_setLocalRoles(user2, ['Author', 'Auditor']) + portal_type = 'Organisation' + + sql_connection = self.getSQLConnection() + + login(self, user2) + obj2 = folder.newContent(portal_type=portal_type) + obj2.manage_setLocalRoles(user1, ['Auditor']) + obj2.manage_permission(perm, ['Assignee', 'Auditor'], 0) + + login(self, user1) + + obj = folder.newContent(portal_type=portal_type) + obj.manage_setLocalRoles(user1, ['Assignee', 'Auditor']) + + # Check that nothing is returned when user can not view the object + obj.manage_permission(perm, [], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee') + self.assertSameSet([], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Auditor'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee') + self.assertSameSet([], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Assignee'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee') + self.assertSameSet([obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + + # Add a new table to the catalog + sql_catalog = self.portal.portal_catalog.getSQLCatalog() + + local_roles_table = "test_assignee_local_roles" + + create_local_role_table_sql = """ +CREATE TABLE `%s` ( + `uid` BIGINT UNSIGNED NOT NULL, + `assignee_reference` varchar(32) NOT NULL default '', + `viewable_assignee_reference` varchar(32) NOT NULL default '', + PRIMARY KEY (`uid`), + KEY `assignee_reference` (`assignee_reference`), + KEY `viewable_assignee_reference` (`viewable_assignee_reference`) +) TYPE=InnoDB; + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_create_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = create_local_role_table_sql) + + drop_local_role_table_sql = """ +DROP TABLE IF EXISTS %s + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z0_drop_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = drop_local_role_table_sql) + + catalog_local_role_sql = """ +REPLACE INTO + %s +VALUES +<dtml-in prefix="loop" expr="_.range(_.len(uid))"> +( + <dtml-sqlvar expr="uid[loop_item]" type="int">, + <dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>, + <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional> +) +<dtml-if sequence-end> +<dtml-else> +, +</dtml-if> +</dtml-in> + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_catalog_%s_list' % local_roles_table, + title = '', + connection_id = 'erp5_sql_connection', + arguments = "\n".join(['uid', + 'getAssignee', + 'getViewPermissionAssignee']), + template = catalog_local_role_sql) + + get_transaction().commit() + current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + \ + ('z_catalog_%s_list' % local_roles_table,) + current_sql_clear_catalog = sql_catalog.sql_clear_catalog + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + \ + ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table) + current_sql_catalog_local_role_keys = \ + sql_catalog.sql_catalog_local_role_keys + sql_catalog.sql_catalog_local_role_keys = ('Assignee | %s.assignee_reference' % \ + local_roles_table,) + + current_sql_catalog_role_keys = \ + sql_catalog.sql_catalog_role_keys + sql_catalog.sql_catalog_role_keys = ( + 'Assignee | %s.viewable_assignee_reference' % \ + local_roles_table,) + + current_sql_search_tables = sql_catalog.sql_search_tables + sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \ + [local_roles_table] + get_transaction().commit() + + try: + # Clear catalog + portal_catalog = self.getCatalogTool() + portal_catalog.manage_catalogClear() + get_transaction().commit() + self.portal.portal_caches.clearAllCache() + get_transaction().commit() + obj2.reindexObject() + + # Check that nothing is returned when user can not view the object + obj.manage_permission(perm, [], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + method = obj.portal_catalog + result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee') + self.assertSameSet([], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Auditor'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee') + self.assertSameSet([obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + + # Check that object is returned when he can view the object + obj.manage_permission(perm, ['Assignee'], 0) + obj.reindexObject() + get_transaction().commit() + self.tic() + result = obj.portal_catalog(portal_type=portal_type) + self.assertSameSet([obj2, obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Assignee') + self.assertSameSet([obj], [x.getObject() for x in result]) + result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor') + self.assertSameSet([obj2, ], [x.getObject() for x in result]) + finally: + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + sql_catalog.sql_catalog_local_role_keys = \ + current_sql_catalog_local_role_keys + sql_catalog.sql_catalog_role_keys = \ + current_sql_catalog_role_keys + sql_catalog.sql_search_tables = current_sql_search_tables + get_transaction().commit() + + def test_UserOrGroupRoleIndexing(self, quiet=quiet, run=run_all_test): + if not run: + return + + login = PortalTestCase.login + logout = self.logout + user1 = 'a_great_user_name' + user1_group = 'a_great_user_group' + uf = self.getPortal().acl_users + uf._doAddUser(user1, user1, ['Member', ], []) + uf.zodb_groups.addGroup( user1_group, user1_group, user1_group) + new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group ) + + perm = 'View' + folder = self.getOrganisationModule() + folder.manage_setLocalRoles(user1, ['Author', 'Auditor']) + portal_type = 'Organisation' + organisation = folder.newContent(portal_type=portal_type) + + sql_connection = self.getSQLConnection() + def query(sql): + result = sql_connection.manage_test(sql) + return result.dictionaries() + + login(self, user1) + + # Add a new table to the catalog + sql_catalog = self.portal.portal_catalog.getSQLCatalog() + + local_roles_table = "test_user_or_group_local_roles" + + create_local_role_table_sql = """ +CREATE TABLE `%s` ( + `uid` BIGINT UNSIGNED NOT NULL, + `assignee_reference` varchar(32) NOT NULL default '', + `viewable_assignee_reference` varchar(32) NOT NULL default '', + PRIMARY KEY (`uid`), + KEY `assignee_reference` (`assignee_reference`), + KEY `viewable_assignee_reference` (`viewable_assignee_reference`) +) TYPE=InnoDB; + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_create_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = create_local_role_table_sql) + + drop_local_role_table_sql = """ +DROP TABLE IF EXISTS %s + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z0_drop_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = drop_local_role_table_sql) + + catalog_local_role_sql = """ +REPLACE INTO + %s +VALUES +<dtml-in prefix="loop" expr="_.range(_.len(uid))"> +( + <dtml-sqlvar expr="uid[loop_item]" type="int">, + <dtml-sqlvar expr="getAssignee[loop_item] or ''" type="string" optional>, + <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional> +) +<dtml-if sequence-end> +<dtml-else> +, +</dtml-if> +</dtml-in> + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_catalog_%s_list' % local_roles_table, + title = '', + connection_id = 'erp5_sql_connection', + arguments = "\n".join(['uid', + 'getAssignee', + 'getViewPermissionAssignee']), + template = catalog_local_role_sql) + + get_transaction().commit() + current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + \ + ('z_catalog_%s_list' % local_roles_table,) + current_sql_clear_catalog = sql_catalog.sql_clear_catalog + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + \ + ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table) + current_sql_catalog_local_role_keys = \ + sql_catalog.sql_catalog_local_role_keys + sql_catalog.sql_catalog_local_role_keys = ( + 'Owner | viewable_owner', + 'Assignee | %s.assignee_reference' % \ + local_roles_table,) + + current_sql_catalog_role_keys = \ + sql_catalog.sql_catalog_role_keys + sql_catalog.sql_catalog_role_keys = ( + 'Assignee | %s.viewable_assignee_reference' % \ + local_roles_table,) + + current_sql_search_tables = sql_catalog.sql_search_tables + sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \ + [local_roles_table] + + portal = self.getPortal() + get_transaction().commit() + + try: + # Clear catalog + portal_catalog = self.getCatalogTool() + portal_catalog.manage_catalogClear() + get_transaction().commit() + self.portal.portal_caches.clearAllCache() + get_transaction().commit() + + organisation_relative_url = organisation.getRelativeUrl() + countResults = organisation.portal_catalog.countResults + count_result_kw = {'relative_url': organisation_relative_url} + + use_case_number = 0 + for view_permission_role_list, security_group_list, \ + global_view, associate_view, assignee_view, both_view in \ + [ + # No view permission + ([], [], 0, 0, 0, 0), + ([], [(user1, ['Associate'])], 0, 0, 0, 0), + ([], [(user1, ['Assignee'])], 0, 0, 0, 0), + ([], [(user1, ['Assignee', 'Associate'])], 0, 0, 0, 0), + ([], [(user1_group, ['Assignee'])], 0, 0, 0, 0), + ([], [(user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0), + ([], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 0, 0, 0, 0), + ([], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], 0, 0, 0, 0), + + # View permission for Assignee + (['Assignee'], [], 0, 0, 0, 0), + (['Assignee'], [(user1, ['Associate'])], 0, 0, 0, 0), + (['Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1), + (['Assignee'], [(user1, ['Assignee', 'Associate'])], 1, 0, 1, 1), + (['Assignee'], [(user1_group, ['Assignee'])], 1, 0, 0, 0), + (['Assignee'], [(user1_group, ['Assignee', 'Associate'])], + 1, 0, 0, 0), + (['Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 1, 0, 1, 1), + (['Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], + 1, 0, 1, 1), + + # View permission for Associate + (['Associate'], [], 0, 0, 0, 0), + (['Associate'], [(user1, ['Associate'])], 1, 1, 0, 0), + (['Associate'], [(user1, ['Assignee'])], 0, 0, 0, 0), + (['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1), + (['Associate'], [(user1_group, ['Assignee'])], 0, 0, 0, 0), + (['Associate'], [(user1_group, ['Assignee', 'Associate'])], + 1, 1, 0, 0), + (['Associate'], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 0, 0, 0, 0), + (['Associate'], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], + 1, 1, 1, 1), + + # View permission for Associate and Assignee + (['Associate', 'Assignee'], [], 0, 0, 0, 0), + (['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 1, 0, 0), + (['Associate', 'Assignee'], [(user1, ['Assignee'])], 1, 0, 1, 1), + (['Associate', 'Assignee'], + [(user1, ['Assignee', 'Associate'])], 1, 1, 1, 1), + (['Associate', 'Assignee'], + [(user1_group, ['Assignee'])], 1, 0, 0, 0), + (['Associate', 'Assignee'], + [(user1_group, ['Assignee', 'Associate'])], 1, 1, 0, 0), + (['Associate', 'Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 1, 0, 1, 1), + (['Associate', 'Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], + 1, 1, 1, 1), + ]: + + use_case_number += 1 + organisation.manage_permission(perm, view_permission_role_list, 0) + organisation.manage_delLocalRoles([user1, user1_group]) + for security_group, local_role_list in security_group_list: + organisation.manage_setLocalRoles(security_group, local_role_list) + organisation.reindexObject() + get_transaction().commit() + self.tic() + + for expected_result, local_roles in \ + [ + (global_view, None), + (associate_view, 'Associate'), + (assignee_view, 'Assignee'), + (both_view, ['Associate', 'Assignee']), + ]: + + object_security_uid = query( + 'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \ + organisation_relative_url + )[0]['security_uid'] + + if object_security_uid is not None: + roles_and_users = query( + 'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \ + object_security_uid + ) + else: + roles_and_users = '' + + monovalue_references = query( + 'SELECT * FROM test_user_or_group_local_roles WHERE uid="%s"' % \ + organisation.getUid())[0] + assignee_reference = monovalue_references['assignee_reference'] + viewable_assignee_reference = \ + monovalue_references['viewable_assignee_reference'] + + result = countResults(local_roles=local_roles, **count_result_kw)[0][0] + if result != expected_result: + countResults(local_roles=local_roles, src__=1, + **count_result_kw) + self.fail('Use case %s\n\tView permission is given to: %s\n\t' \ + 'Local roles are: %s\n\t' \ + 'local_roles parameter is: %s\n\t' \ + 'Object IS %s returned by portal_catalog!\n\t' \ + '\n\tSecurity uid is: %s\n\t' + 'Roles and users: %s\n\t' + 'assignee_reference: %s\n\t' + 'viewable_assignee_reference: %s\n\t' + '\n\tSQL generated: \n\n%s' \ + '' % \ + (use_case_number, + view_permission_role_list, + organisation.__ac_local_roles__, + local_roles, ['NOT', ''][result], + object_security_uid, + str([x['allowedRolesAndUsers'] for x in roles_and_users]), + assignee_reference, + viewable_assignee_reference, + countResults(local_roles=local_roles, src__=1, + **count_result_kw))) + + finally: + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + sql_catalog.sql_catalog_local_role_keys = \ + current_sql_catalog_local_role_keys + sql_catalog.sql_catalog_role_keys = \ + current_sql_catalog_role_keys + sql_catalog.sql_search_tables = current_sql_search_tables + get_transaction().commit() + + def test_UserOrGroupLocalRoleIndexing(self, quiet=quiet, run=run_all_test): + if not run: + return + + login = PortalTestCase.login + logout = self.logout + user1 = 'another_great_user_name' + user1_group = 'another_great_user_group' + uf = self.getPortal().acl_users + uf._doAddUser(user1, user1, ['Member', ], []) + uf.zodb_groups.addGroup( user1_group, user1_group, user1_group) + new = uf.zodb_groups.addPrincipalToGroup( user1, user1_group ) + + perm = 'View' + folder = self.getOrganisationModule() + folder.manage_setLocalRoles(user1, ['Author', 'Auditor']) + portal_type = 'Organisation' + organisation = folder.newContent(portal_type=portal_type) + + sql_connection = self.getSQLConnection() + def query(sql): + result = sql_connection.manage_test(sql) + return result.dictionaries() + + login(self, user1) + + # Add a new table to the catalog + sql_catalog = self.portal.portal_catalog.getSQLCatalog() + + local_roles_table = "another_test_user_or_group_local_roles" + + create_local_role_table_sql = """ +CREATE TABLE `%s` ( + `uid` BIGINT UNSIGNED NOT NULL, + `viewable_assignee_reference` varchar(32) NOT NULL default '', + PRIMARY KEY (`uid`), + KEY `viewable_assignee_reference` (`viewable_assignee_reference`) +) TYPE=InnoDB; + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_create_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = create_local_role_table_sql) + + drop_local_role_table_sql = """ +DROP TABLE IF EXISTS %s + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z0_drop_%s' % local_roles_table, + title = '', + arguments = "", + connection_id = 'erp5_sql_connection', + template = drop_local_role_table_sql) + + catalog_local_role_sql = """ +REPLACE INTO + %s +VALUES +<dtml-in prefix="loop" expr="_.range(_.len(uid))"> +( + <dtml-sqlvar expr="uid[loop_item]" type="int">, + <dtml-sqlvar expr="getViewPermissionAssignee[loop_item] or ''" type="string" optional> +) +<dtml-if sequence-end> +<dtml-else> +, +</dtml-if> +</dtml-in> + """ % local_roles_table + sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod( + id = 'z_catalog_%s_list' % local_roles_table, + title = '', + connection_id = 'erp5_sql_connection', + arguments = "\n".join(['uid', + 'getViewPermissionAssignee']), + template = catalog_local_role_sql) + + get_transaction().commit() + current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + \ + ('z_catalog_%s_list' % local_roles_table,) + current_sql_clear_catalog = sql_catalog.sql_clear_catalog + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + \ + ('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table) + + current_sql_catalog_role_keys = \ + sql_catalog.sql_catalog_role_keys + sql_catalog.sql_catalog_role_keys = ( + 'Owner | viewable_owner', + 'Assignee | %s.viewable_assignee_reference' % \ + local_roles_table,) + + current_sql_search_tables = sql_catalog.sql_search_tables + sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \ + [local_roles_table] + + portal = self.getPortal() + get_transaction().commit() + + try: + # Clear catalog + portal_catalog = self.getCatalogTool() + portal_catalog.manage_catalogClear() + get_transaction().commit() + self.portal.portal_caches.clearAllCache() + get_transaction().commit() + + organisation_relative_url = organisation.getRelativeUrl() + countResults = organisation.portal_catalog.countResults + count_result_kw = {'relative_url': organisation_relative_url} + + use_case_number = 0 + for view_permission_role_list, security_group_list, \ + associate_view, assignee_view in \ + [ + # No view permission + ([], [], 0, 0), + ([], [(user1, ['Associate'])], 0, 0), + ([], [(user1, ['Assignee'])], 0, 0), + ([], [(user1, ['Assignee', 'Associate'])], 0, 0), + ([], [(user1_group, ['Assignee'])], 0, 0), + ([], [(user1_group, ['Assignee', 'Associate'])], 0, 0), + ([], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 0, 0), + ([], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], 0, 0), + + # View permission for Assignee + (['Assignee'], [], 0, 0), + (['Assignee'], [(user1, ['Associate'])], 0, 0), + (['Assignee'], [(user1, ['Assignee'])], 0, 1), + (['Assignee'], [(user1, ['Assignee', 'Associate'])], 0, 1), + (['Assignee'], [(user1_group, ['Assignee'])], 0, 1), + (['Assignee'], [(user1_group, ['Assignee', 'Associate'])], 0, 1), + (['Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 0, 1), + (['Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], 0, 1), + + # View permission for Associate + (['Associate'], [], 0, 0), + (['Associate'], [(user1, ['Associate'])], 1, 0), + (['Associate'], [(user1, ['Assignee'])], 0, 0), + (['Associate'], [(user1, ['Assignee', 'Associate'])], 1, 0), + (['Associate'], [(user1_group, ['Assignee'])], 0, 0), + (['Associate'], [(user1_group, ['Assignee', 'Associate'])], 1, 0), + (['Associate'], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 0, 0), + (['Associate'], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], 1, 0), + + # View permission for Associate and Assignee + (['Associate', 'Assignee'], [], 0, 0), + (['Associate', 'Assignee'], [(user1, ['Associate'])], 1, 0), + (['Associate', 'Assignee'], [(user1, ['Assignee'])], 0, 1), + (['Associate', 'Assignee'], + [(user1, ['Assignee', 'Associate'])], 1, 1), + (['Associate', 'Assignee'], + [(user1_group, ['Assignee'])], 0, 1), + (['Associate', 'Assignee'], + [(user1_group, ['Assignee', 'Associate'])], 1, 1), + (['Associate', 'Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee'])], 0, 1), + (['Associate', 'Assignee'], [(user1, ['Assignee']), + (user1_group, ['Assignee', 'Associate'])], 1, 1), + ]: + + use_case_number += 1 + organisation.manage_permission(perm, view_permission_role_list, 0) + organisation.manage_delLocalRoles([user1, user1_group]) + for security_group, local_role_list in security_group_list: + organisation.manage_setLocalRoles(security_group, local_role_list) + organisation.reindexObject() + get_transaction().commit() + self.tic() + + for expected_result, local_roles in \ + [ + (associate_view or assignee_view, None), + (associate_view, 'Associate'), + (assignee_view, 'Assignee'), + (associate_view or assignee_view, ['Associate', 'Assignee']), + ]: + + object_security_uid = query( + 'SELECT security_uid FROM catalog WHERE relative_url="%s"' % \ + organisation_relative_url + )[0]['security_uid'] + + if object_security_uid is not None: + roles_and_users = query( + 'SELECT allowedRolesAndUsers FROM roles_and_users WHERE uid="%s"' % \ + object_security_uid + ) + else: + roles_and_users = '' + + monovalue_references = query( + 'SELECT * FROM %s WHERE uid="%s"' % \ + (local_roles_table, organisation.getUid()))[0] + viewable_assignee_reference = \ + monovalue_references['viewable_assignee_reference'] + + result = countResults(local_roles=local_roles, **count_result_kw)[0][0] + if result != expected_result: + countResults(local_roles=local_roles, src__=1, + **count_result_kw) + self.fail('Use case %s\n\tView permission is given to: %s\n\t' \ + 'Local roles are: %s\n\t' \ + 'local_roles parameter is: %s\n\t' \ + 'Object IS %s returned by portal_catalog!\n\t' \ + '\n\tSecurity uid is: %s\n\t' + 'Roles and users: %s\n\t' + 'viewable_assignee_reference: %s\n\t' + '\n\tSQL generated: \n\n%s' \ + '' % \ + (use_case_number, + view_permission_role_list, + organisation.__ac_local_roles__, + local_roles, ['NOT', ''][result], + object_security_uid, + str([x['allowedRolesAndUsers'] for x in roles_and_users]), + viewable_assignee_reference, + countResults(local_roles=local_roles, src__=1, + **count_result_kw))) + + finally: + sql_catalog.sql_catalog_object_list = \ + current_sql_catalog_object_list + sql_catalog.sql_clear_catalog = \ + current_sql_clear_catalog + sql_catalog.sql_catalog_role_keys = \ + current_sql_catalog_role_keys + sql_catalog.sql_search_tables = current_sql_search_tables + get_transaction().commit() + def test_ObjectReindexationConcurency(self, quiet=quiet, run=run_all_test): if not run: return -- 2.30.9