ActivityTool.py 72.4 KB
Newer Older
1
from __future__ import absolute_import
Jean-Paul Smets's avatar
Jean-Paul Smets committed
2 3 4
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
29
from six import string_types as basestring
30
from Products.ERP5Type.Utils import ensure_list, str2unicode
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31

32
import copy
33
import socket
34
from six.moves import urllib
35 36
import threading
import sys
37
import six
38
from collections import defaultdict
39
from six.moves.cPickle import dumps, loads
40
from Products.CMFCore import permissions as CMFCorePermissions
41
from Products.CMFActivity.ActiveResult import ActiveResult
42
from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
43
from Products.CMFActivity.ActivityConnection import ActivityConnection
44
from Products.PythonScripts.Utility import allow_class
45
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
46 47 48 49
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
50
from AccessControl.users import system as system_user
51
from Products.CMFCore.utils import UniqueObject
52
from Products.ERP5Type.Globals import InitializeClass, DTMLFile
53
from Acquisition import aq_base, aq_inner, aq_parent
54 55
from .ActivityBuffer import ActivityBuffer
from .ActivityRuntimeEnvironment import BaseMessage
56
from zExceptions import ExceptionFormatter, Redirect
57
from BTrees.OIBTree import OIBTree
58
from BTrees.OOBTree import OOBTree
59 60
from Zope2 import app
from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
61
from zope.component.hooks import setSite
62
import transaction
63
from App.config import getConfiguration
64
from Shared.DC.ZRDB.Results import Results
65

66
from zope.globalrequest import getRequest, setRequest
67
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
68

69
from zLOG import LOG, INFO, WARNING, ERROR
70
import warnings
71
from time import time
72
from pprint import pformat
73 74

try:
75
  from Products.TimerService import getTimerService
76
except ImportError:
77 78
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80
from traceback import format_list, extract_stack
81

Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83 84 85
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
86 87
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
88
is_running_lock = threading.Lock()
89
currentNode = None
90
_server_address = None
91 92
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93

94 95
# Logging channel definitions
import logging
96
activity_tracking_logger = logging.getLogger('Tracking')
97
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
98 99


100 101 102 103 104 105 106 107
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

108 109 110 111 112 113 114
def getServerAddress():
    """
    Return current server address
    """
    global _server_address
    if _server_address is None:
        ip = port = ''
115 116 117 118
        try:
            zopewsgi = sys.modules['Products.ERP5.bin.zopewsgi']
        except KeyError:
            from asyncore import socket_map
119
            for k, v in six.iteritems(socket_map):
120 121 122 123 124 125 126 127
                if hasattr(v, 'addr'):
                    # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                    type = str(getattr(v, '__class__', 'unknown'))
                    if type == 'ZServer.HTTPServer.zhttp_server':
                        ip, port = v.addr
                        break
        else:
            ip, port = zopewsgi.server.addr
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        if ip == '0.0.0.0':
            ip = socket.gethostbyname(socket.gethostname())
        _server_address = '%s:%s' %(ip, port)
    return _server_address

def getCurrentNode():
    """ Return current node identifier """
    global currentNode
    if currentNode is None:
      currentNode = getattr(
        getConfiguration(),
        'product_config',
        {},
      ).get('cmfactivity', {}).get('node-id')
    if currentNode is None:
      warnings.warn('Node name auto-generation is deprecated, please add a'
        '\n'
        '<product-config CMFActivity>\n'
        '  node-id = ...\n'
        '</product-config>\n'
        'section in your zope.conf, replacing "..." with a cluster-unique '
        'node identifier.', DeprecationWarning)
      currentNode = getServerAddress()
    return currentNode

153 154 155
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
156
global_activity_buffer = defaultdict(dict)
157
from _thread import get_ident
158

159 160 161 162
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

163

164 165 166 167
class SkippedMessage(Exception):
  pass


168
class Message(BaseMessage):
169
  """Activity Message Class.
170

171 172
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
173

174
  active_process = None
175
  active_process_uid = None
176
  call_traceback = None
177
  exc_info = None
178
  exc_type = None
179 180
  is_executed = MESSAGE_NOT_EXECUTED
  traceback = None
181 182 183
  user_name = None
  user_object = None
  user_folder_path = None
184
  document_uid = None
185
  is_registered = False
186
  line = None
187

188 189 190
  def __init__(
      self,
      url,
191
      document_uid,
192 193 194 195 196 197 198 199 200
      active_process,
      active_process_uid,
      activity_kw,
      method_id,
      args, kw,
      request=None,
      portal_activities=None,
    ):
    self.object_path = url
201
    self.document_uid = document_uid
202 203
    self.active_process = active_process
    self.active_process_uid = active_process_uid
Jean-Paul Smets's avatar
Jean-Paul Smets committed
204 205 206 207
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
208
    if getattr(portal_activities, 'activity_creation_trace', False):
209 210 211 212
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
213 214 215 216 217 218 219 220
    user = getSecurityManager().getUser()
    self.user_object = copy.deepcopy(aq_base(user))
    # Note: userfolders are not ERP5 objects, so use OFS API.
    self.user_folder_path = getattr(
      aq_parent(user),
      'getPhysicalPath',
      lambda: None,
    )()
221
    # Store REQUEST Info
222
    self.request_info = {}
223
    if request is not None:
224 225 226 227 228 229 230 231 232
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
233

234 235 236 237 238 239 240 241
  @staticmethod
  def load(s, **kw):
    self = loads(s)
    self.__dict__.update(kw)
    return self

  dump = dumps

242 243 244 245 246 247 248
  def getGroupId(self):
    get = self.activity_kw.get
    group_method_id = get('group_method_id', '')
    if group_method_id is None:
      group_method_id = 'portal_activities/dummyGroupMethod/' + self.method_id
    return group_method_id + '\0' + get('group_id', '')

249 250 251 252
  def getGroupMethodCost(self):
    # Meaningless if called on a non-grouped message
    return self.activity_kw.get('group_method_cost', .01)

253 254 255 256 257 258
  def _getObject(self, activity_tool):
    obj = activity_tool.getPhysicalRoot()
    for id in self.object_path[1:]:
      obj = obj[id]
    return obj

259
  def getObject(self, activity_tool):
260
    """return the object referenced in this message."""
261
    try:
262
      obj = self._getObject(activity_tool)
263
    except KeyError:
264
      LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)"
265
          % (self.object_path,), error=True)
266
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE)
267
    else:
268 269
      if self.document_uid and self.document_uid != getattr(aq_base(obj), 'uid', None):
        raise ValueError("UID mismatch for %r" % obj)
270 271 272 273
      return obj

  def getObjectList(self, activity_tool):
    """return the list of object that can be expanded from this message
274 275 276 277
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
278 279 280 281 282 283 284 285 286 287
    obj = self.getObject(activity_tool)
    if obj is None:
      return ()
    if 'expand_method_id' in self.activity_kw:
      return getattr(obj, self.activity_kw['expand_method_id'])()
    return obj,

  def getObjectCount(self, activity_tool):
    if 'expand_method_id' in self.activity_kw:
      try:
288
        obj = self._getObject(activity_tool)
289
        return len(getattr(obj, self.activity_kw['expand_method_id'])())
290
      except Exception:
291 292
        pass
    return 1
293

294
  def changeUser(self, activity_tool, annotate_transaction=True):
295
    """restore the security context for the calling user."""
296
    portal = activity_tool.getPortalObject()
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
    user = self.user_object
    if user is None and self.user_name is not None: # BBB
      user_name = self.user_name
      user_folder = portal_user_folder = portal.acl_users
      user = user_folder.getUserById(user_name)
      # if the user is not found, try to get it from a parent acl_users
      # XXX this is still far from perfect, because we need to store all
      # information about the user (like original user folder, roles) to
      # replay the activity with exactly the same security context as if
      # it had been executed without activity.
      if user is None:
        user_folder = portal.aq_parent.acl_users
        user = user_folder.getUserById(user_name)
      if user is None and user_name == system_user.getUserName():
        # The following logic partly comes from unrestricted_apply()
        # implementation in ERP5Type.UnrestrictedMethod but we get roles
        # from the portal to have more roles.
        user_folder = portal_user_folder
        user = PrivilegedUser(
          user_name,
          None,
          user_folder.valid_roles(),
          (),
        )
    else:
      user_folder = portal.getPhysicalRoot().unrestrictedTraverse(
        self.user_folder_path,
      )
      user_name = user.getIdOrUserName()
326
    if user is not None:
327
      user = user.__of__(user_folder)
328
      newSecurityManager(None, user)
329
      if annotate_transaction:
330 331 332
        if six.PY2:
          user_name = user_name.decode('utf-8')
        transaction.get().setUser(user_name, u'/'.join(user_folder.getPhysicalPath()))
333
    else :
334
      LOG("CMFActivity", WARNING,
335
          "Unable to find user %r in the portal" % user_name)
336
      noSecurityManager()
337 338
    return user

339 340 341
  def activateResult(self, active_process, result, object):
    if not isinstance(result, ActiveResult):
      result = ActiveResult(result=result)
342 343 344
    signature = self.activity_kw.get('signature')
    if signature:
      result.edit(id=signature)
345 346
    # XXX Allow other method_id in future
    result.edit(object_path=object, method_id=self.method_id)
347
    active_process.postResult(result)
348

Jean-Paul Smets's avatar
Jean-Paul Smets committed
349
  def __call__(self, activity_tool):
350
    try:
351
      obj = self.getObject(activity_tool)
352
      if obj is not None:
353 354
        old_security_manager = getSecurityManager()
        try:
355 356
          # Change user if required (TO BE DONE)
          # We will change the user only in order to execute this method
357
          self.changeUser(activity_tool)
358 359 360
          # XXX: There is no check to see if user is allowed to access
          #      that method !
          method = getattr(obj, self.method_id)
361
          transaction.get().note(
362 363 364 365
            u'CMFActivity {}/{}'.format(
              '/'.join([str2unicode(x) for x in self.object_path]),
              str2unicode(self.method_id),
            )
366
          )
367 368 369 370
          # Store site info
          setSite(activity_tool.getParentValue())
          if activity_tool.activity_timing_log:
            result = activity_timing_method(method, self.args, self.kw)
371
          else:
372
            result = method(*self.args, **self.kw)
373 374 375 376
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
377 378 379 380
          if self.active_process and result is not None:
            self.activateResult(
              activity_tool.unrestrictedTraverse(self.active_process),
              result, obj)
381
          self.setExecutionState(MESSAGE_EXECUTED)
382 383
    except:
      self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
384

385
  def notifyUser(self, activity_tool, retry=False):
386
    """Notify the user that the activity failed."""
387 388 389
    if not activity_tool.activity_failure_mail_notification:
      return

390
    portal = activity_tool.getPortalObject()
391
    user_email = portal.getProperty('email_to_address',
392
                       portal.getProperty('email_from_address'))
393 394
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
395
    fail_count = self.line.retry + 1
396
    if retry:
397 398 399 400
      message = "Pending activity already failed %s times" % fail_count
    else:
      message = "Activity failed"
    path = '/'.join(self.object_path)
401
    mail_text = """From: %s <%s>
402
To: %s
403
Subject: %s: %s/%s
404

405
Node: %s
406
Failures: %s
407
User name: %r
408
Uid: %u
409 410
Document: %s
Method: %s
411 412
Arguments: %r
Named Parameters: %r
413 414 415
""" % (
      email_from_name, activity_tool.email_from_address, user_email, message,
      path, self.method_id, getCurrentNode(), fail_count,
416
      self.getUserId(),
417 418
      self.line.uid, path, self.method_id, self.args, self.kw,
    )
419 420 421 422
    if self.traceback:
      mail_text += '\nException:\n' + self.traceback
    if self.call_traceback:
      mail_text += '\nCreated at:\n' + self.call_traceback
423
    try:
424
      portal.MailHost.send(mail_text)
425
    except (socket.error, MailHostError) as message:
426 427
      LOG('ActivityTool.notifyUser', WARNING,
          'Mail containing failure information failed to be sent: %s' % message)
428

429 430 431 432 433 434 435 436
  def getUserId(self):
    user = self.user_object
    return (
      self.user_name
      if user is None else
      user.getIdOrUserName()
    )

437
  def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
438
    # Reactivate the original object.
439
    obj = self._getObject(activity_tool)
440
    old_security_manager = getSecurityManager()
441
    try:
442 443
      # Change user if required (TO BE DONE)
      # We will change the user only in order to execute this method
444
      user = self.changeUser(activity_tool)
445
      active_obj = obj.activate(activity=activity, **self.activity_kw)
446 447 448
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
449
      setSecurityManager(old_security_manager)
450

451 452 453 454 455 456
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
457

458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
478
    if is_executed == MESSAGE_NOT_EXECUTED:
479
      if not exc_info:
480
        exc_info = sys.exc_info()
481 482
      if self.on_error_callback is not None:
        self.exc_info = exc_info
483 484
      self.exc_type = exc_info[0]
      if exc_info[0] is None:
485 486
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
487
          raise Exception('Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.')
488 489
        except Exception:
          exc_info = sys.exc_info()
490 491
      elif exc_info[0] is SkippedMessage:
        return
492 493 494 495 496
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
497
          error_log.raising(exc_info)
498
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info)[1:])
499 500 501 502

  def getExecutionState(self):
    return self.is_executed

503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
class GroupedMessage(object):
  __slots__ = 'object', '_message', 'result', 'exc_info'

  def __init__(self, object, message):
    self.object = object
    self._message = message

  args = property(lambda self: self._message.args)
  kw = property(lambda self: self._message.kw)

  def raised(self, exc_info=None):
    self.exc_info = exc_info or sys.exc_info()
    try:
      del self.result
    except AttributeError:
      pass

# XXX: Allowing restricted code to implement a grouping method is questionable
#      but there already exist some.
522
  __parent__ = property(lambda self: self.object) # for object
523
  _guarded_writes = 1 # for result
524
allow_class(GroupedMessage)
525 526 527

# Activity Registration
def activity_dict():
528
  from .Activity import SQLDict, SQLQueue, SQLJoblib
529
  return {k: getattr(v, k)() for k, v in six.iteritems(locals())}
530 531 532
activity_dict = activity_dict()


Vincent Pelletier's avatar
Vincent Pelletier committed
533 534
class Method(object):
  __slots__ = (
535 536
    '_portal_activities',
    '_passive_url',
537
    '_passive_uid',
538 539 540 541 542 543
    '_activity',
    '_active_process',
    '_active_process_uid',
    '_kw',
    '_method_id',
    '_request',
Vincent Pelletier's avatar
Vincent Pelletier committed
544
  )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
545

546
  def __init__(self, portal_activities, passive_url, passive_uid, activity,
547
      active_process, active_process_uid, kw, method_id, request):
548 549
    self._portal_activities = portal_activities
    self._passive_url = passive_url
550
    self._passive_uid = passive_uid
551 552 553 554 555 556
    self._activity = activity
    self._active_process = active_process
    self._active_process_uid = active_process_uid
    self._kw = kw
    self._method_id = method_id
    self._request = request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
557 558

  def __call__(self, *args, **kw):
559
    portal_activities = self._portal_activities
560
    m = Message(
561
      url=self._passive_url,
562
      document_uid=self._passive_uid,
563 564 565 566
      active_process=self._active_process,
      active_process_uid=self._active_process_uid,
      activity_kw=self._kw,
      method_id=self._method_id,
567 568
      args=args,
      kw=kw,
569
      request=self._request,
570 571
      portal_activities=portal_activities,
    )
572
    portal_activities.getActivityBuffer().deferredQueueMessage(
573
      portal_activities, activity_dict[self._activity], m)
574
    if portal_activities.activity_tracking and m.is_registered:
575
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.getUserId()))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
576

577 578
allow_class(Method)

Vincent Pelletier's avatar
Vincent Pelletier committed
579 580 581
class ActiveWrapper(object):
  __slots__ = (
    '__portal_activities',
582
    '__passive_url',
583
    '__passive_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
584 585
    '__activity',
    '__active_process',
586
    '__active_process_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
587 588 589
    '__kw',
    '__request',
  )
590 591 592
  # Shortcut security lookup (avoid calling __getattr__)
  __parent__ = None

593
  def __init__(self, portal_activities, url, uid, activity, active_process,
594
      active_process_uid, kw, request):
595
    # second parameter can be an object or an object's path
596
    self.__portal_activities = portal_activities
597
    self.__passive_url = url
598
    self.__passive_uid = uid
599 600
    self.__activity = activity
    self.__active_process = active_process
601
    self.__active_process_uid = active_process_uid
602 603 604 605 606 607
    self.__kw = kw
    self.__request = request

  def __getattr__(self, name):
    return Method(
      self.__portal_activities,
608
      self.__passive_url,
609
      self.__passive_uid,
610 611
      self.__activity,
      self.__active_process,
612
      self.__active_process_uid,
613 614 615 616
      self.__kw,
      name,
      self.__request,
    )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
617

618
  def __repr__(self):
619 620
    return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self),
                                   self.__passive_url)
621

622 623 624
# True when activities cannot be executing any more.
has_processed_shutdown = False

625 626 627 628 629 630 631 632
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
633

634 635 636
# Due to a circular import dependency between this module and
# Products.ERP5Type.Core.Folder, both modules must import after the definitions
# of getCurrentNode and Folder (the later is a base class of BaseTool).
637 638 639 640 641 642
from Products.ERP5Type.Tool.BaseTool import BaseTool
# Activating a path means we tried to avoid loading useless
# data in cache so there would be no gain to expect.
# And all nodes are likely to have tools already loaded.
NO_DEFAULT_NODE_PREFERENCE = str, BaseTool

643
class ActivityTool (BaseTool):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
644
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
645 646 647 648 649 650 651 652 653 654 655 656
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
657 658 659
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
660
    portal_type = 'Activity Tool'
661
    title = 'Activities'
662
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
663 664
    security = ClassSecurityInfo()

665 666
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
667
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
668
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
669
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
670
                     ,
671
                     ] + list(BaseTool.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
672 673

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
674
    manageActivities = DTMLFile( 'dtml/manageActivities', globals(), pformat=pformat )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
675

676 677 678
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

679 680
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
681

682
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
683
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals(), _getCurrentNode=getCurrentNode)
684

685 686
    distributingNode = ''
    _nodes = ()
687 688
    _family_list = ()
    _node_family_dict = None
689
    activity_creation_trace = False
690
    activity_tracking = False
691
    activity_timing_log = False
692
    activity_failure_mail_notification = False
693
    cancel_and_invoke_links_hidden = False
694 695 696 697

    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
698
        all = BaseTool.filtered_meta_types(self)
699 700 701 702 703 704
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

705 706 707
    def getSQLConnection(self):
      return self.aq_inner.aq_parent.cmf_activity_sql_connection()

708 709 710 711 712 713 714 715 716 717 718 719 720 721
    def maybeMigrateConnectionClass(self):
      connection_id = 'cmf_activity_sql_connection'
      sql_connection = getattr(self, connection_id, None)
      if (sql_connection is not None and
          not isinstance(sql_connection, ActivityConnection)):
        # SQL Connection migration is needed
        LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
        parent = aq_parent(aq_inner(sql_connection))
        parent._delObject(sql_connection.getId())
        new_sql_connection = ActivityConnection(connection_id,
                                                sql_connection.title,
                                                sql_connection.connection_string)
        parent._setObject(connection_id, new_sql_connection)

722
    security.declarePrivate('initialize')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
723
    def initialize(self):
724
      self.maybeMigrateConnectionClass()
725
      for activity in six.itervalues(activity_dict):
726
        activity.initialize(self, clear=False)
727 728 729 730 731 732 733 734 735 736 737 738 739 740
      # Remove old skin if any.
      skins_tool = self.getPortalObject().portal_skins
      name = 'activity'
      if (getattr(skins_tool.get(name), '_dirpath', None)
          == 'Products.CMFActivity:skins/activity'):
        for selection, skins in skins_tool.getSkinPaths():
          skins = skins.split(',')
          try:
            skins.remove(name)
          except ValueError:
            continue
          skins_tool.manage_skinLayers(
            add_skin=1, skinname=selection, skinpath=skins)
        skins_tool._delObject(name)
741

742 743 744
    def _callSafeFunction(self, batch_function):
      return batch_function()

745 746
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
747 748 749 750 751 752
      """
      return True, if we are subscribed to TimerService.
      Otherwise return False.
      """
      service = getTimerService(self)
      if service:
753
        path = '/'.join(self.getPhysicalPath())
754 755 756
        return path in service.lisSubscriptions()
      LOG('ActivityTool', INFO, 'TimerService not available')
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
757

758
    security.declareProtected(Permissions.manage_properties, 'subscribe')
759
    def subscribe(self, REQUEST=None, RESPONSE=None):
760 761
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
762
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
763
        if not service:
764
            LOG('ActivityTool', INFO, 'TimerService not available')
765
            url += urllib.parse.quote('TimerService not available')
766 767
        else:
            service.subscribe(self)
768
            url += urllib.parse.quote("Subscribed to Timer Service")
769 770
        if RESPONSE is not None:
            RESPONSE.redirect(url)
771 772

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
773
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
774 775
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
776
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
777
        if not service:
778
            LOG('ActivityTool', INFO, 'TimerService not available')
779
            url += urllib.parse.quote('TimerService not available')
780 781
        else:
            service.unsubscribe(self)
782
            url += urllib.parse.quote("Unsubscribed from Timer Service")
783 784
        if RESPONSE is not None:
            RESPONSE.redirect(url)
785

786 787 788 789 790 791 792 793 794 795 796 797
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
798
          url += urllib.parse.quote('Tracking log enabled')
799 800 801 802 803 804 805 806 807 808
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
809
          url += urllib.parse.quote('Tracking log disabled')
810 811
          RESPONSE.redirect(url)

812 813 814 815 816 817 818 819 820 821 822 823
    security.declareProtected(Permissions.manage_properties, 'isActivityMailNotificationEnabled')
    def isActivityMailNotificationEnabled(self):
      return self.activity_failure_mail_notification

    security.declareProtected(Permissions.manage_properties, 'manage_enableMailNotification')
    def manage_enableMailNotification(self, REQUEST=None, RESPONSE=None):
        """
          Enable mail notification when activity fails.
        """
        self.activity_failure_mail_notification = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
824
          url += urllib.parse.quote('Mail notification enabled')
825 826 827 828 829 830 831 832 833 834
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableMailNotification')
    def manage_disableMailNotification(self, REQUEST=None, RESPONSE=None):
        """
          Disable mail notification when activity fails.
        """
        self.activity_failure_mail_notification = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
835
          url += urllib.parse.quote('Mail notification disabled')
836 837
          RESPONSE.redirect(url)

838 839 840 841 842 843 844 845 846 847 848 849
    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
850
          url += urllib.parse.quote('Timing log enabled')
851 852 853 854 855 856 857 858 859 860
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
861
          url += urllib.parse.quote('Timing log disabled')
862 863 864 865 866 867 868 869 870 871 872 873 874 875
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
876
          url += urllib.parse.quote('Activity creation trace enabled')
877 878 879 880 881 882 883 884 885 886
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
887
          url += urllib.parse.quote('Activity creation trace disabled')
888 889
          RESPONSE.redirect(url)

890 891 892 893 894 895 896 897 898 899 900
    security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
    def isCancelAndInvokeLinksHidden(self):
      return self.cancel_and_invoke_links_hidden

    security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
    def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
901
          url += urllib.parse.quote('Cancel and invoke links hidden')
902 903 904 905 906 907 908 909 910
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
    def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
911
          url += urllib.parse.quote('Cancel and invoke links visible')
912 913
          RESPONSE.redirect(url)

914
    security.declarePrivate('manage_beforeDelete')
915 916
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
917
        BaseTool.inheritedAttribute('manage_beforeDelete')(self, item, container)
918

919
    security.declarePrivate('manage_afterAdd')
920 921
    def manage_afterAdd(self, item, container):
        self.subscribe()
922
        BaseTool.inheritedAttribute('manage_afterAdd')(self, item, container)
923

924
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getServerAddress')
925 926 927 928
    def getServerAddress(self):
        """
        Backward-compatibility code only.
        """
929 930 931 932 933
        warnings.warn(
          '"getServerAddress" class method is deprecated, use "getServerAddress" module-level function instead.',
          DeprecationWarning,
          stacklevel=2,
        )
934
        return getServerAddress()
935

936
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNode')
937
    def getCurrentNode(self):
938 939 940
        """
        Backward-compatibility code only.
        """
941 942 943 944 945
        warnings.warn(
          '"getCurrentNode" class method is deprecated, use "getCurrentNode" module-level function instead.',
          DeprecationWarning,
          stacklevel=2,
        )
946
        return getCurrentNode()
947

948
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getDistributingNode')
949 950 951 952
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

953 954 955 956 957 958 959 960 961 962 963 964 965
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
966
        new_nodes.update([(x, ROLE_PROCESSING) for x in nodes])
967 968 969
        self._nodes = nodes = new_nodes
      return nodes

970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
    def _getNodeFamilyIdDict(self):
      result = self._node_family_dict
      if result is None:
        result = self._node_family_dict = OOBTree()
      return result

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyIdSet')
    def getCurrentNodeFamilyIdSet(self):
      """
      Returns the tuple of family ids current node is member of.
      """
      return self._getNodeFamilyIdDict().get(getCurrentNode(), ())

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNodeFamilyNameSet')
    def getCurrentNodeFamilyNameSet(self):
      """
      Returns the tuple of family names current node is member of.
      """
      return [
        self._family_list[-x - 1]
        for x in self._getNodeFamilyIdDict().get(getCurrentNode(), ())
      ]

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyId')
    def getFamilyId(self, name):
      """
      Raises ValueError for unknown family names.
      """
      # First family is -1, second is -2, etc.
      return -self._family_list.index(name) - 1

    security.declareProtected(CMFCorePermissions.ManagePortal, 'addNodeToFamily')
    def addNodeToFamily(self, node_id, family_name):
      """
      Silently does nothing if node is already a member of family_name.
      """
      family_id = self.getFamilyId(family_name)
      node_family_id_dict = self._getNodeFamilyIdDict()
      family_id_list = node_family_id_dict.get(node_id, ())
      if family_id not in family_id_list:
        node_family_id_dict[node_id] = family_id_list + (family_id, )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addNodeSetToFamily')
    def manage_addNodeSetToFamily(self, family_new_node_list, REQUEST):
      """
      Add selected nodes to family.
      """
      family_name = REQUEST['manage_addNodeSetToFamily']
      if isinstance(family_new_node_list, basestring):
        family_new_node_list = [family_new_node_list]
      for node_id in family_new_node_list:
        self.addNodeToFamily(node_id, family_name)
      REQUEST.RESPONSE.redirect(
        REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
1024
        urllib.parse.quote('Nodes added to family.'),
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
      )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'removeNodeFromFamily')
    def removeNodeFromFamily(self, node_id, family_name):
      """
      Silently does nothing if node is not member of family_name.
      """
      family_id = self.getFamilyId(family_name)
      node_family_id_dict = self._getNodeFamilyIdDict()
      family_id_list = node_family_id_dict.get(node_id, ())
      if family_id in family_id_list:
        node_family_id_dict[node_id] = tuple(
          x
          for x in family_id_list
          if x != family_id
        )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeNodeSetFromFamily')
    def manage_removeNodeSetFromFamily(self, REQUEST):
      """
      Remove selected nodes from family.
      """
      family_name = REQUEST['manage_removeNodeSetFromFamily']
      node_to_remove_list = REQUEST['family_member_set_' + family_name]
      if isinstance(node_to_remove_list, basestring):
        node_to_remove_list = [node_to_remove_list]
      for node_id in node_to_remove_list:
        self.removeNodeFromFamily(node_id, family_name)
      REQUEST.RESPONSE.redirect(
        REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message=' +
1055
        urllib.parse.quote('Nodes removed from family.'),
1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
      )

    def _checkFamilyName(self, name):
      if not isinstance(name, basestring):
        raise TypeError('Name must be a string')
      if name in self._family_list:
        raise ValueError('Already in use')
      if name in ('', 'same'):
        raise ValueError('Reserved family name')

    security.declareProtected(CMFCorePermissions.ManagePortal, 'createFamily')
    def createFamily(self, name):
      """
      Raises ValueError if family already exists.
      """
      self._checkFamilyName(name)
      new_family_list = []
      for existing_name in self._family_list:
        if existing_name is None and name is not None:
          new_family_list.append(name)
          name = None
        else:
          new_family_list.append(existing_name)
      if name is None:
        # A free spot has been recycled.
        self._family_list = tuple(new_family_list)
      else:
        # No free spot, append.
        self._family_list += (name, )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_createFamily')
1087
    def manage_createFamily(self, new_family_name, REQUEST, family_new_node_list=None):
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
      """Create a family"""
      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
      if family_new_node_list is None:
        family_new_node_list = []
      elif isinstance(family_new_node_list, basestring):
        family_new_node_list = [family_new_node_list]
      try:
        self.createFamily(new_family_name)
        for node_id in family_new_node_list:
          self.addNodeToFamily(node_id, new_family_name)
      except ValueError as exc:
1099 1100
        raise Redirect(redirect_url + urllib.parse.quote(str(exc)))
      REQUEST.RESPONSE.redirect(redirect_url + urllib.parse.quote('Family created.'))
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124

    security.declareProtected(CMFCorePermissions.ManagePortal, 'renameFamily')
    def renameFamily(self, old_name, new_name):
      """
      Raises ValueError if old_name does not exist.
      """
      self._checkFamilyName(new_name)
      family_list = self._family_list
      if old_name not in family_list:
        raise ValueError('Unknown family')
      self._family_list = tuple(
        new_name if x == old_name else x
        for x in family_list
      )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_renameFamily')
    def manage_renameFamily(self, REQUEST):
      """Rename a family"""
      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
      old_family_name = REQUEST['manage_renameFamily']
      new_family_name = REQUEST['family_new_name_' + old_family_name]
      try:
        self.renameFamily(old_family_name, new_family_name)
      except ValueError as exc:
1125 1126
        raise Redirect(redirect_url + urllib.parse.quote(str(exc)))
      REQUEST.RESPONSE.redirect(redirect_url + urllib.parse.quote('Family renamed.'))
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147

    security.declareProtected(CMFCorePermissions.ManagePortal, 'deleteFamily')
    def deleteFamily(self, name):
      """
      Raises ValueError if name does not exist.
      """
      for node_id in self._getNodeFamilyIdDict():
        self.removeNodeFromFamily(node_id, name)
      self._family_list = tuple(
        None if x == name else x
        for x in self._family_list
      )

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_deleteFamily')
    def manage_deleteFamily(self, REQUEST):
      """Delete families"""
      redirect_url = REQUEST.URL1 + '/manageLoadBalancing?manage_tabs_message='
      family_name = REQUEST['manage_deleteFamily']
      try:
        self.deleteFamily(family_name)
      except ValueError as exc:
1148 1149
        raise Redirect(redirect_url + urllib.parse.quote(str(exc)))
      REQUEST.RESPONSE.redirect(redirect_url + urllib.parse.quote('Family deleted'))
1150 1151 1152 1153 1154 1155 1156 1157

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyNameList')
    def getFamilyNameList(self):
      """
      Return the list of existing family names.
      """
      return [x for x in self._family_list if x is not None]

1158
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getFamilyNodeList')
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
    def getFamilyNodeList(self, family_name):
      """
      Return the list of node names in given family.
      """
      family_id = self.getFamilyId(family_name)
      return [
        x
        for x, y in self._getNodeFamilyIdDict().items()
        if family_id in y
      ]

1170 1171
    def registerNode(self, node):
      node_dict = self.getNodeDict()
1172 1173 1174 1175
      if node not in node_dict:
        if node_dict:
          # BBB: check if our node was known by address (processing and/or
          # distribution), and migrate it.
1176
          server_address = getServerAddress()
1177 1178 1179 1180 1181 1182
          role = node_dict.pop(server_address, ROLE_IDLE)
          if self.distributingNode == server_address:
            self.distributingNode = node
        else:
          # We are registering the first node, make
          # it both the distributing node and a processing node.
1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
          role = ROLE_PROCESSING
          self.distributingNode = node
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

1195
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
1196 1197
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
1198

1199 1200
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
1201
      return isinstance(node_name, str)
1202

1203
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_setDistributingNode')
1204
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
1205
        """ set the distributing node """
1206
        if not distributingNode or self._isValidNodeName(distributingNode):
1207 1208 1209 1210 1211
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
1212
                  urllib.parse.quote("Distributing Node successfully changed."))
1213 1214 1215 1216 1217
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
1218
                  urllib.parse.quote("Malformed Distributing Node."))
1219

1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
1243
          urllib.parse.quote(message))
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
1259
          urllib.parse.quote(message))
1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
1275
          urllib.parse.quote(message))
1276

1277
    security.declarePrivate('process_shutdown')
1278 1279 1280 1281 1282
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
1283
        global has_processed_shutdown
1284 1285
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
1286 1287 1288 1289
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

1290
    security.declareProtected(CMFCorePermissions.ManagePortal, 'process_timer')
1291
    def process_timer(self, tick, interval, prev="", next=""):
1292 1293 1294 1295 1296 1297 1298 1299
      """
      Call distribute() if we are the Distributing Node and call tic()
      with our node number.
      This method is called by TimerService in the interval given
      in zope.conf. The Default is every 5 seconds.
      """
      # Prevent TimerService from starting multiple threads in parallel
      if timerservice_lock.acquire(0):
1300
        try:
1301 1302 1303 1304 1305 1306
          # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
          # but on 2.2 it's by traversal, and our site probably wasn't traversed
          # by the timerserver request, which goes into the Zope Control_Panel
          # calling it a second time is a harmless and cheap no-op.
          # both setupCurrentSkin and REQUEST are acquired from containers.
          self.setupCurrentSkin(self.REQUEST)
1307
          old_sm = getSecurityManager()
1308
          try:
1309 1310 1311 1312
            # get owner of portal_catalog, so normally we should be able to
            # have the permission to invoke all activities
            user = self.portal_catalog.getWrappedOwner()
            newSecurityManager(self.REQUEST, user)
1313

1314
            currentNode = getCurrentNode()
1315 1316 1317 1318
            self.registerNode(currentNode)
            processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

            # only distribute when we are the distributingNode
1319
            if self.getDistributingNode() == currentNode:
1320 1321 1322 1323 1324 1325
              self.distribute(len(processing_node_list))

            # SkinsTool uses a REQUEST cache to store skin objects, as
            # with TimerService we have the same REQUEST over multiple
            # portals, we clear this cache to make sure the cache doesn't
            # contains skins from another portal.
1326
            try:
1327 1328 1329 1330 1331 1332 1333 1334
              self.getPortalObject().portal_skins.changeSkin(None)
            except AttributeError:
              pass

            # call tic for the current processing_node
            # the processing_node numbers are the indices of the elements
            # in the node tuple +1 because processing_node starts form 1
            if currentNode in processing_node_list:
1335
              self.tic(processing_node_list.index(currentNode) + 1)
1336 1337 1338
          except:
            # Catch ALL exception to avoid killing timerserver.
            LOG('ActivityTool', ERROR, 'process_timer received an exception',
1339
                error=True)
1340 1341
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
1342
        finally:
1343
          timerservice_lock.release()
1344

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1345 1346 1347 1348 1349
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363
      inner_self = aq_inner(self)
      while is_running_lock.acquire(0):
        try:
          # Note: "has_more_to_distribute" is to be taken in a lose sense, we
          # do not positively know there is more, just that distribute returned
          # before it could confirm there is nothing left to do.
          has_more_to_distribute = False
          # Call distribute on each queue
          for activity in six.itervalues(activity_dict):
            has_more_to_distribute |= activity.distribute(inner_self, node_count)
          if not has_more_to_distribute:
            break
        finally:
          is_running_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1364

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1365
    security.declarePublic('tic')
1366
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1367 1368
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1369
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1370
      """
1371
      global active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1372 1373

      # return if the number of threads is too high
1374
      # else, increase the number of active_threads and continue
1375 1376 1377 1378 1379
      with tic_lock:
        too_many_threads = (active_threads >= max_active_threads)
        if not too_many_threads or force:
          active_threads += 1
        else:
1380
          raise RuntimeError('Too many threads')
1381

1382
      inner_self = aq_inner(self)
1383

1384
      try:
1385
        # Loop as long as there are activities. Always process the queue with
1386 1387
        # "highest" priority. If several queues have same highest priority,
        # use a round-robin algorithm.
1388 1389
        # XXX: We always finish by iterating over all queues, in case that
        #      getPriority does not see messages dequeueMessage would process.
1390
        activity_list = ensure_list(activity_dict.values())
1391
        def sort_key(activity):
1392 1393
          return activity.getPriority(self, processing_node,
            node_family_id_set)
1394 1395
        while is_running_lock.acquire(0):
          try:
1396 1397
            # May have changed since previous iteration.
            node_family_id_set = self.getCurrentNodeFamilyIdSet()
1398 1399
            activity_list.sort(key=sort_key) # stable sort
            for i, activity in enumerate(activity_list):
1400
              # Transaction processing is the responsability of the activity
1401
              if activity.dequeueMessage(inner_self, processing_node,
1402
                node_family_id_set):
1403
                activity_list.append(activity_list.pop(i))
1404 1405 1406 1407 1408
                break
            else:
              break
          finally:
            is_running_lock.release()
1409 1410
      finally:
        # decrease the number of active_threads
1411 1412
        with tic_lock:
          active_threads -= 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1413

1414
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1415
      # Check in each queue if the object has deferred tasks
1416
      # if not argument is provided, then check on self
1417 1418
      if args:
        obj, = args
1419
      else:
1420
        obj = self
1421 1422 1423
      path = None if obj is None else '/'.join(obj.getPhysicalPath())
      db = self.getSQLConnection()
      quote = db.string_literal
1424
      return bool(db.query(b"(%s)" % b") UNION ALL (".join(
1425
        activity.hasActivitySQL(quote, path=path, **kw)
1426
        for activity in six.itervalues(activity_dict)))[1])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1427

1428
    security.declarePrivate('getActivityBuffer')
1429 1430 1431 1432 1433 1434 1435 1436
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
      """
1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
      # XXX: using a volatile attribute to cache getPhysicalPath result.
      # This cache may need invalidation if all the following is
      # simultaneously true:
      # - ActivityTool instances can be moved in object tree
      # - moved instance is used to get access to its activity buffer
      # - another instance is put in the place of the original, and used to
      #   access its activity buffer
      # ...which seems currently unlikely, and as such is left out.
      try:
        my_instance_key = self._v_physical_path
      except AttributeError:
        # Safeguard: make sure we are wrapped in acquisition context before
        # using our path as an activity tool instance-wide identifier.
        assert getattr(self, 'aq_self', None) is not None
        self._v_physical_path = my_instance_key = self.getPhysicalPath()
1452
      thread_activity_buffer = global_activity_buffer[my_instance_key]
1453
      my_thread_key = get_ident()
1454 1455 1456
      try:
        return thread_activity_buffer[my_thread_key]
      except KeyError:
1457
        if create_if_not_found:
1458
          buffer = ActivityBuffer()
1459 1460 1461
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
1462
        return buffer
1463

1464 1465
    def activateObject(self, object, activity=DEFAULT_ACTIVITY,
                       active_process=None, serialization_tag=None,
1466
                       node=None, uid=None, **kw):
1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477
      if active_process is None:
        active_process_uid = None
      elif isinstance(active_process, str):
        # TODO: deprecate
        active_process_uid = self.unrestrictedTraverse(active_process).getUid()
      else:
        active_process_uid = active_process.getUid()
        active_process = active_process.getPhysicalPath()
      if isinstance(object, str):
        url = tuple(object.split('/'))
      else:
1478 1479 1480
        if uid is not None:
          raise ValueError
        uid = getattr(aq_base(object), 'uid', None)
1481
        url = object.getPhysicalPath()
1482 1483
      if serialization_tag is not None:
        kw['serialization_tag'] = serialization_tag
1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495
      while 1: # not a loop
        if node is None:
          # The caller lets us decide whether we prefer to execute on same node
          # (to increase the efficiency of the ZODB Storage cache).
          if (isinstance(object, NO_DEFAULT_NODE_PREFERENCE)
              # A grouped activity is the sign we may have many of them so make
              # sure that this node won't overprioritize too many activities.
              or kw.get('group_method_id', '') != ''):
            break
        elif node == '':
          break
        elif node != 'same':
1496 1497
          kw['node'] = self.getFamilyId(node)
          break
1498
        try:
1499 1500
          kw['node'] = 1 + self.getNodeList(
            role=ROLE_PROCESSING).index(getCurrentNode())
1501 1502
        except ValueError:
          pass
1503
        break
1504
      return ActiveWrapper(self, url, uid, activity,
1505
                           active_process, active_process_uid, kw,
1506
                           getattr(self, 'REQUEST', None))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1507

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1508
    def getRegisteredMessageList(self, activity):
1509
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1510
      if activity_buffer is not None:
1511 1512
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1513
                                                 aq_inner(self))
1514 1515
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1516

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1517
    def unregisterMessage(self, activity, message):
1518 1519 1520
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1521

1522
    def flush(self, obj, invoke=0, **kw):
1523
      self.getActivityBuffer()
1524 1525
      if isinstance(obj, tuple):
        object_path = obj
1526
      else:
1527
        object_path = obj.getPhysicalPath()
1528
      for activity in six.itervalues(activity_dict):
1529
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1530 1531

    def invoke(self, message):
1532
      if self.activity_tracking:
1533
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.getUserId()))
1534 1535
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1536
        base_chain = [aq_base(x) for x in self.aq_chain]
1537 1538 1539
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1540 1541 1542 1543 1544 1545 1546 1547
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1548 1549
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1550 1551
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1552
        # XXX: PATH_INFO might not be set when runing unit tests.
1553
        if request.environ.get('PATH_INFO') is None:
1554
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1555

1556
        # restore request information
1557
        old_request = getRequest()
1558
        new_request = request.clone()
1559
        setRequest(new_request)
1560
        request_info = message.request_info
1561 1562
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1563 1564
        if '_script' in request_info:
          new_request._script = request_info['_script']
1565
        if 'SERVER_URL' in request_info:
1566
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1567 1568 1569
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1570
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1571
          new_request.processInputs()
1572 1573

        new_request_container = request_container.__class__(REQUEST=new_request)
1574 1575 1576 1577 1578 1579 1580 1581
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1582 1583 1584
      try:
        message(my_self)
      finally:
1585 1586 1587 1588
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1589
        setRequest(old_request)
1590
      if self.activity_tracking:
1591
        activity_tracking_logger.info('invoked message')
1592 1593 1594
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1595

1596
    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1597
      if self.activity_tracking:
1598 1599 1600
        activity_tracking_logger.info(
          'invoking group messages: method_id=%s, paths=%s'
          % (method_id, ['/'.join(m.object_path) for m in message_list]))
1601
      # Invoke a group method.
1602
      message_dict = {}
1603
      path_set = set()
1604 1605 1606
      # Filter the list of messages. If an object is not available, mark its
      # message as non-executable. In addition, expand an object if necessary,
      # and make sure that no duplication happens.
1607
      for m in message_list:
1608 1609
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1610
        try:
1611 1612 1613
          object_list = m.getObjectList(self)
          if object_list is None:
            continue
1614
          message_dict[m] = expanded_object_list = []
1615
          for subobj in object_list:
1616 1617 1618 1619
            if merge_duplicate:
              path = subobj.getPath()
              if path in path_set:
                continue
1620
              path_set.add(path)
1621 1622 1623 1624 1625 1626 1627 1628 1629 1630
            if alternate_method_id is not None \
               and hasattr(aq_base(subobj), alternate_method_id):
              # if this object is alternated,
              # generate a new single active object
              activity_kw = m.activity_kw.copy()
              activity_kw.pop('group_method_id', None)
              activity_kw.pop('group_id', None)
              active_obj = subobj.activate(activity=activity, **activity_kw)
              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
            else:
1631
              expanded_object_list.append(GroupedMessage(subobj, m))
1632
        except:
1633
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1634

1635
      expanded_object_list = sum(six.itervalues(message_dict), [])
1636
      try:
1637
        if expanded_object_list:
1638 1639
          # Store site info
          setSite(self.getParentValue())
1640
          traverse = self.getPortalObject().unrestrictedTraverse
1641
          # FIXME: how to apply security here?
1642
          # NOTE: The callee must update each processed item of
1643 1644 1645 1646 1647
          #       expanded_object_list, by setting:
          #       - 'exc_info' in case of error
          #       - 'result' otherwise, with None or the result to post
          #          on the active process
          #       Skipped item must not be touched.
1648
          traverse(method_id)(expanded_object_list)
1649 1650
      except:
        # In this case, the group method completely failed.
1651
        exc_info = sys.exc_info()
1652
        for m in message_dict:
1653
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1654
        LOG('WARNING ActivityTool', 0,
1655
            'Could not call method %s on objects %s' %
1656 1657
            (method_id, [x.object for x in expanded_object_list]),
            error=exc_info)
1658 1659 1660
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1661
      else:
1662
        # Note there can be partial failures.
1663
        for m, expanded_object_list in six.iteritems(message_dict):
1664 1665
          result_list = []
          for result in expanded_object_list:
1666 1667 1668 1669 1670 1671
            try:
              if result.result is not None:
                result_list.append(result)
            except AttributeError:
              exc_info = getattr(result, "exc_info", (SkippedMessage,))
              break # failed or skipped message
1672 1673
          else:
            try:
1674 1675 1676
              if result_list and m.active_process:
                active_process = traverse(m.active_process)
                for result in result_list:
1677
                  m.activateResult(active_process, result.result, result.object)
1678
            except:
1679
              exc_info = None
1680
            else:
1681
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1682
              continue
1683
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, context=self)
1684
      exc_info = None
1685
      if self.activity_tracking:
1686
        activity_tracking_logger.info('invoked group messages')
1687

1688 1689 1690 1691
    security.declarePrivate('dummyGroupMethod')
    class dummyGroupMethod(object):
      def __bobo_traverse__(self, REQUEST, method_id):
        def group_method(message_list):
1692
          sm = getSecurityManager()
1693 1694
          try:
            for m in message_list:
1695
              m._message.changeUser(m.object, annotate_transaction=False)
1696
              m.result = getattr(m.object, method_id)(*m.args, **m.kw)
1697
          except Exception:
1698
            m.raised()
1699 1700
          finally:
            setSecurityManager(sm)
1701 1702 1703
        return group_method
    dummyGroupMethod = dummyGroupMethod()

1704 1705
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1706
      # Some Security Cheking should be made here XXX
1707
      self.getActivityBuffer()
1708
      activity_dict[activity].queueMessage(aq_inner(self),
1709 1710
        Message(path, active_process, activity_kw, method_id, args, kw,
          portal_activities=self))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1711

1712
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1713 1714 1715 1716 1717 1718
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1719
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1720
      if REQUEST is not None:
1721 1722
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1723

1724
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageRestart')
1725 1726 1727 1728
    def manageRestart(self, message_uid_list, activity, REQUEST=None):
      """
        Restart one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1729 1730
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
1731
      if message_uid_list:
1732
        activity_dict[activity].assignMessageList(self.getSQLConnection(),
1733
                                                     0, message_uid_list)
1734 1735 1736 1737
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))

1738
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      LOG('ActivityTool', WARNING,
          '"manageCancel" method is deprecated, use "manageDelete" instead.')
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      self.flush(object_path,method_id=method_id,invoke=0)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
1750
          self.absolute_url(), 'manageActivities'))
1751 1752 1753 1754 1755 1756

    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageDelete' )
    def manageDelete(self, message_uid_list, activity, REQUEST=None):
      """
        Delete one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1757 1758
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
1759 1760
      activity_dict[activity].deleteMessageList(
        self.getSQLConnection(), message_uid_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1761
      if REQUEST is not None:
1762 1763
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1764

1765 1766
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1767
    def manageClearActivities(self, keep=1, RESPONSE=None):
1768
      """
1769
        Recreate tables, clearing all activities
1770
      """
1771
      for activity in six.itervalues(activity_dict):
1772
        activity.initialize(self, clear=True)
1773

1774 1775 1776
      if RESPONSE is not None:
        return RESPONSE.redirect(self.absolute_url_path() +
          '/manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared')
1777

1778 1779 1780

    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'getMessageTempObjectList')
1781 1782 1783 1784 1785 1786 1787 1788
    def getMessageTempObjectList(self, **kw):
      """
        Get object list of messages waiting in queues
      """
      message_list = self.getMessageList(**kw)
      object_list = []
      for sql_message in message_list:
        message = self.newContent(temp_object=1)
1789
        message.__dict__.update(**sql_message.__dict__)
1790 1791 1792
        object_list.append(message)
      return object_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1793
    security.declarePublic('getMessageList')
1794
    def getMessageList(self, activity=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1795 1796 1797
      """
        List messages waiting in queues
      """
1798 1799
      if activity:
        return activity_dict[activity].getMessageList(aq_inner(self), **kw)
1800

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1801
      message_list = []
1802
      for activity in six.itervalues(activity_dict):
Sebastien Robin's avatar
Sebastien Robin committed
1803
        try:
1804
          message_list += activity.getMessageList(aq_inner(self), **kw)
Sebastien Robin's avatar
Sebastien Robin committed
1805 1806
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1807 1808
      return message_list

1809 1810 1811 1812 1813
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
1814
      return self.countMessage(tag=value)
Sebastien Robin's avatar
Sebastien Robin committed
1815 1816 1817 1818 1819 1820 1821 1822 1823

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1824
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1825 1826 1827
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
1828 1829 1830 1831
      db = self.getSQLConnection()
      quote = db.string_literal
      return sum(x for x, in db.query("(%s)" % ") UNION ALL (".join(
        activity.countMessageSQL(quote, **kw)
1832
        for activity in six.itervalues(activity_dict)))[1])
1833

1834
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1835
    def newActiveProcess(self, REQUEST=None, **kw):
1836 1837
      # note: if one wants to create an Actice Process without ERP5 products,
      # she can call ActiveProcess.addActiveProcess
1838
      obj = self.newContent(portal_type="Active Process", **kw)
1839 1840 1841
      if REQUEST is not None:
        REQUEST['RESPONSE'].redirect( 'manage_main' )
      return obj
1842

1843 1844
    security.declarePrivate('getSQLTableNameSet')
    def getSQLTableNameSet(self):
1845
      return [x.sql_table for x in six.itervalues(activity_dict)]
1846

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1847 1848
    # Required for tests (time shift)
    def timeShift(self, delay):
1849
      for activity in six.itervalues(activity_dict):
1850
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1851

1852
InitializeClass(ActivityTool)