diff --git a/product/ERP5/Document/Alarm.py b/product/ERP5/Document/Alarm.py index 3840c3e9598f635dc8febd5903304a49bfdd042e..89353cec33eb216e4ff2f403708154c878e8b130 100644 --- a/product/ERP5/Document/Alarm.py +++ b/product/ERP5/Document/Alarm.py @@ -28,6 +28,7 @@ import zope.interface from AccessControl import ClassSecurityInfo +from AccessControl import Unauthorized from Products.CMFCore.utils import getToolByName from Products.ERP5Type import Permissions, PropertySheet from Products.ERP5Type.XMLObject import XMLObject @@ -36,6 +37,9 @@ from DateTime import DateTime from Products.ERP5Type.Message import Message from Products.ERP5Type.DateUtils import addToDate from Products.CMFCore.PortalContent import _getViewFor +from Products.ERP5Security.ERP5UserManager import SUPER_USER +from AccessControl.SecurityManagement import getSecurityManager, \ + setSecurityManager, newSecurityManager class PeriodicityMixin: """ @@ -280,7 +284,7 @@ class Alarm(XMLObject, PeriodicityMixin): """ return self.hasActivity(only_valid=1) - security.declareProtected(Permissions.ManagePortal, 'activeSense') + security.declareProtected(Permissions.AccessContentsInformation, 'activeSense') def activeSense(self, fixit=0): """ This method launches the sensing process as activities. @@ -291,35 +295,47 @@ class Alarm(XMLObject, PeriodicityMixin): The result of the sensing process can be obtained by invoking the sense method or by requesting a report. """ - # LOG('activeSense, self.getPath()',0,self.getPath()) - - # Set the next date at which this method should be invoked - self.setNextAlarmDate() - - # Find the active sensing method and invoke it - # as an activity so that we can benefit from - # distribution of alarm processing as soon as possible - method_id = self.getActiveSenseMethodId() - if method_id not in (None, ''): - # A tag is provided as a parameter in order to be - # able to notify the user after all processes are ended - # Tag is generated from portal_ids so that it can be retrieved - # later when creating an active process for example - # We do some inspection to keep compatibility - # (because fixit and tag were not set previously) - tag = str(self.portal_ids.generateNewLengthId(id_group=self.getId())) - kw = {} - method = getattr(self, method_id) - name_list = method.func_code.co_varnames - if 'fixit' in name_list or (method.func_defaults is not None - and len(method.func_defaults) < len(name_list)): - # New API - also if variable number of named parameters - getattr(self.activate(tag=tag), method_id)(fixit=fixit, tag=tag) - else: - # Old API - getattr(self.activate(tag=tag), method_id)() - if self.isAlarmNotificationMode(): - self.activate(after_tag=tag).notify(include_active=True) + portal_membership = self.getPortalObject().portal_membership + if fixit or not self.getEnabled(): + checkPermission = portal_membership.checkPermission + if not checkPermission(Permissions.ManagePortal, self): + raise Unauthorized('fixing problems or activating a disabled alarm is not allowed') + + # Switch to the superuser temporarily, so that the behavior would not + # change even if this method is invoked by random users. + sm = getSecurityManager() + newSecurityManager(None, portal_membership.getMemberById(SUPER_USER)) + try: + # Set the next date at which this method should be invoked + self.setNextAlarmDate() + + # Find the active sensing method and invoke it + # as an activity so that we can benefit from + # distribution of alarm processing as soon as possible + method_id = self.getActiveSenseMethodId() + if method_id not in (None, ''): + # A tag is provided as a parameter in order to be + # able to notify the user after all processes are ended + # Tag is generated from portal_ids so that it can be retrieved + # later when creating an active process for example + # We do some inspection to keep compatibility + # (because fixit and tag were not set previously) + tag = str(self.portal_ids.generateNewLengthId(id_group=self.getId())) + kw = {} + method = getattr(self, method_id) + name_list = method.func_code.co_varnames + if 'fixit' in name_list or (method.func_defaults is not None + and len(method.func_defaults) < len(name_list)): + # New API - also if variable number of named parameters + getattr(self.activate(tag=tag), method_id)(fixit=fixit, tag=tag) + else: + # Old API + getattr(self.activate(tag=tag), method_id)() + if self.isAlarmNotificationMode(): + self.activate(after_tag=tag).notify(include_active=True) + finally: + # Restore the original user. + setSecurityManager(sm) security.declareProtected(Permissions.ManagePortal, 'sense') def sense(self, process=None): diff --git a/product/ERP5/tests/testAlarm.py b/product/ERP5/tests/testAlarm.py index b6f18471b79df57b12345553428b9340d774a723..f1853f93e8c0e2a5eef5280639bccce2662f6a0a 100644 --- a/product/ERP5/tests/testAlarm.py +++ b/product/ERP5/tests/testAlarm.py @@ -31,7 +31,9 @@ import transaction from Testing import ZopeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase -from AccessControl.SecurityManagement import newSecurityManager +from AccessControl.SecurityManagement import newSecurityManager, \ + getSecurityManager, setSecurityManager +from AccessControl import Unauthorized from DateTime import DateTime from zLOG import LOG from Products.ERP5Type.DateUtils import addToDate @@ -591,7 +593,100 @@ class TestAlarm(ERP5TypeTestCase): else: raise AssertionError, m.method_id + def test_19_ManualInvocation(self, quiet=0, run=run_all_test): + """ + test if an alarm can be invoked directly by the user securely, + and if the results are identical when allowed. + """ + if not run: return + if not quiet: + message = 'Test manual invocation' + ZopeTestCase._print('\n%s ' % message) + LOG('Testing... ', 0, message) + + alarm = self.newAlarm() + # Create script that generate active process + sense_method_id = 'Alarm_setBogusLocalProperty' + skin_folder_id = 'custom' + skin_folder = self.getPortal().portal_skins[skin_folder_id] + skin_folder.manage_addProduct['PythonScripts']\ + .manage_addPythonScript(id=sense_method_id) + skin_folder[sense_method_id].ZPythonScript_edit('*args,**kw', + 'context.setProperty("bogus", str(context.showPermissions()))') + + # update alarm properties + alarm.edit(active_sense_method_id=sense_method_id, + enabled=False) + transaction.commit() + self.tic() + + # Make a normal user. + uf = self.getPortal().acl_users + uf._doAddUser('normal', '', ['Member', 'Auditor'], []) + user = uf.getUserById('normal').__of__(uf) + # Check the pre-conditions. + self.assertEquals(alarm.getProperty('bogus', None), None) + self.assertEquals(alarm.getEnabled(), False) + sm = getSecurityManager() + newSecurityManager(None, user) + + # Non-managers must not be able to invoke a disabled alarm. + self.assertRaises(Unauthorized, alarm.activeSense) + self.assertRaises(Unauthorized, alarm.activeSense, fixit=1) + + # Non-managers must not be able to invoke the automatic fixation. + setSecurityManager(sm) + alarm.setEnabled(True) + self.assertEquals(alarm.getEnabled(), True) + newSecurityManager(None, user) + self.assertRaises(Unauthorized, alarm.activeSense, fixit=1) + + # Now, check that everybody can invoke an enabled alarm manually. + setSecurityManager(sm) + correct_answer = str(alarm.showPermissions()) + self.assertNotEquals(correct_answer, None) + + alarm.activeSense() + transaction.commit() + self.tic() + self.assertEquals(alarm.getProperty('bogus', None), correct_answer) + alarm.setProperty('bogus', None) + self.assertEquals(alarm.getProperty('bogus', None), None) + + newSecurityManager(None, user) + alarm.activeSense() + transaction.commit() + self.tic() + self.assertEquals(alarm.getProperty('bogus', None), correct_answer) + setSecurityManager(sm) + alarm.setProperty('bogus', None) + + # Check that Manager can invoke an alarm freely. + alarm.activeSense(fixit=1) + transaction.commit() + self.tic() + self.assertEquals(alarm.getProperty('bogus', None), correct_answer) + alarm.setProperty('bogus', None) + self.assertEquals(alarm.getProperty('bogus', None), None) + + alarm.setEnabled(False) + self.assertEquals(alarm.getEnabled(), False) + + alarm.activeSense() + transaction.commit() + self.tic() + self.assertEquals(alarm.getProperty('bogus', None), correct_answer) + alarm.setProperty('bogus', None) + self.assertEquals(alarm.getProperty('bogus', None), None) + + alarm.activeSense(fixit=1) + transaction.commit() + self.tic() + self.assertEquals(alarm.getProperty('bogus', None), correct_answer) + alarm.setProperty('bogus', None) + self.assertEquals(alarm.getProperty('bogus', None), None) + def test_suite(): suite = unittest.TestSuite()