ActivityTool.py 61.9 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34
from collections import defaultdict
35
from cPickle import dumps, loads
36
from Products.CMFCore import permissions as CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
40
from Products.CMFActivity.ActivityConnection import ActivityConnection
41
from Products.PythonScripts.Utility import allow_class
42
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
43 44 45 46
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
47
from AccessControl.User import system as system_user
48
from Products.CMFCore.utils import UniqueObject
49
from Products.ERP5Type.Globals import InitializeClass, DTMLFile
50
from Acquisition import aq_base, aq_inner, aq_parent
51
from ActivityBuffer import ActivityBuffer
52
from ActivityRuntimeEnvironment import BaseMessage
53
from zExceptions import ExceptionFormatter
54
from BTrees.OIBTree import OIBTree
55 56
from Zope2 import app
from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
57
from zope.site.hooks import setSite
58
import transaction
59
from App.config import getConfiguration
60

61 62 63 64
import Products.Localizer.patches
localizer_lock = Products.Localizer.patches._requests_lock
localizer_contexts = Products.Localizer.patches._requests
LocalizerContext = lambda request: request
65

66

67
from ZODB.POSException import ConflictError
68
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69

70
from zLOG import LOG, INFO, WARNING, ERROR
71
import warnings
72
from time import time
73 74

try:
75
  from Products.TimerService import getTimerService
76
except ImportError:
77 78
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80
from traceback import format_list, extract_stack
81

Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83 84 85
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
86 87
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
88
is_running_lock = threading.Lock()
89
currentNode = None
90
_server_address = None
91 92
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93

94 95 96 97 98
# Logging channel definitions
import logging
# Main logging channel
activity_logger = logging.getLogger('CMFActivity')
# Some logging subchannels
99
activity_tracking_logger = logging.getLogger('Tracking')
100
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
101 102 103 104 105 106 107 108 109 110

# Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
# Otherwise, it will end up in root logging facility (ie, event.log).
from App.config import getConfiguration
import os
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = os.path.join(instancehome, 'log')
  if os.path.isdir(log_directory):
    from Signals import Signals
111 112
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
113 114 115 116 117 118 119 120
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
    Signals.registerZopeSignals([log_file_handler])
    activity_logger.addHandler(log_file_handler)
    activity_logger.propagate = 0

121 122 123 124 125 126 127 128
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

129 130 131
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
132 133
global_activity_buffer = defaultdict(dict)
from thread import get_ident
134

135 136 137 138
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

139

140 141 142 143
class SkippedMessage(Exception):
  pass


144
class Message(BaseMessage):
145
  """Activity Message Class.
146

147 148
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
149

150
  active_process = None
151
  active_process_uid = None
152
  call_traceback = None
153
  exc_info = None
154 155 156
  is_executed = MESSAGE_NOT_EXECUTED
  processing = None
  traceback = None
157
  oid = None
158
  is_registered = False
159

160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
  def __init__(
      self,
      url,
      oid,
      active_process,
      active_process_uid,
      activity_kw,
      method_id,
      args, kw,
      request=None,
      portal_activities=None,
    ):
    self.object_path = url
    self.oid = oid
    self.active_process = active_process
    self.active_process_uid = active_process_uid
Jean-Paul Smets's avatar
Jean-Paul Smets committed
176 177 178 179
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
180
    if getattr(portal_activities, 'activity_creation_trace', False):
181 182 183 184
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
185
    self.user_name = getSecurityManager().getUser().getIdOrUserName()
186
    # Store REQUEST Info
187
    self.request_info = {}
188
    if request is not None:
189 190 191 192 193 194 195 196 197
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198

199 200 201 202 203 204 205 206
  @staticmethod
  def load(s, **kw):
    self = loads(s)
    self.__dict__.update(kw)
    return self

  dump = dumps

207 208 209 210 211 212 213
  def getGroupId(self):
    get = self.activity_kw.get
    group_method_id = get('group_method_id', '')
    if group_method_id is None:
      group_method_id = 'portal_activities/dummyGroupMethod/' + self.method_id
    return group_method_id + '\0' + get('group_id', '')

214 215 216 217 218 219
  def _getObject(self, activity_tool):
    obj = activity_tool.getPhysicalRoot()
    for id in self.object_path[1:]:
      obj = obj[id]
    return obj

220
  def getObject(self, activity_tool):
221
    """return the object referenced in this message."""
222
    try:
223
      obj = self._getObject(activity_tool)
224
    except KeyError:
225 226 227
      LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)"
          % (self.object_path,), error=sys.exc_info())
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE)
228
    else:
229 230 231
      if (self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None) and
          # XXX: BusinessTemplate must be fixed to preserve OID
          'portal_workflow' not in self.object_path):
232 233 234 235 236
        raise ValueError("OID mismatch for %r" % obj)
      return obj

  def getObjectList(self, activity_tool):
    """return the list of object that can be expanded from this message
237 238 239 240
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
241 242 243 244 245 246 247 248 249 250
    obj = self.getObject(activity_tool)
    if obj is None:
      return ()
    if 'expand_method_id' in self.activity_kw:
      return getattr(obj, self.activity_kw['expand_method_id'])()
    return obj,

  def getObjectCount(self, activity_tool):
    if 'expand_method_id' in self.activity_kw:
      try:
251
        obj = self._getObject(activity_tool)
252 253 254 255
        return len(getattr(obj, self.activity_kw['expand_method_id'])())
      except StandardError:
        pass
    return 1
256

257
  def changeUser(self, user_name, activity_tool):
258
    """restore the security context for the calling user."""
259 260 261
    portal = activity_tool.getPortalObject()
    portal_uf = portal.acl_users
    uf = portal_uf
262
    user = uf.getUserById(user_name)
263
    # if the user is not found, try to get it from a parent acl_users
264
    # XXX this is still far from perfect, because we need to store all
265
    # information about the user (like original user folder, roles) to
266 267
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
268
    if user is None:
269
      uf = portal.aq_parent.acl_users
270
      user = uf.getUserById(user_name)
271
    if user is None and user_name == system_user.getUserName():
272 273 274 275
      # The following logic partly comes from unrestricted_apply()
      # implementation in ERP5Type.UnrestrictedMethod but we get roles
      # from the portal to have more roles.
      uf = portal_uf
276 277
      role_list = uf.valid_roles()
      user = PrivilegedUser(user_name, None, role_list, ()).__of__(uf)
278 279 280
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
281
      transaction.get().setUser(user_name, '/'.join(uf.getPhysicalPath()))
282
    else :
283
      LOG("CMFActivity", WARNING,
284
          "Unable to find user %r in the portal" % user_name)
285
      noSecurityManager()
286 287
    return user

288 289 290
  def activateResult(self, active_process, result, object):
    if not isinstance(result, ActiveResult):
      result = ActiveResult(result=result)
291 292 293
    signature = self.activity_kw.get('signature')
    if signature:
      result.edit(id=signature)
294 295
    # XXX Allow other method_id in future
    result.edit(object_path=object, method_id=self.method_id)
296
    active_process.postResult(result)
297

Jean-Paul Smets's avatar
Jean-Paul Smets committed
298
  def __call__(self, activity_tool):
299
    try:
300
      obj = self.getObject(activity_tool)
301
      if obj is not None:
302 303
        old_security_manager = getSecurityManager()
        try:
304 305 306
          # Change user if required (TO BE DONE)
          # We will change the user only in order to execute this method
          self.changeUser(self.user_name, activity_tool)
307 308 309
          # XXX: There is no check to see if user is allowed to access
          #      that method !
          method = getattr(obj, self.method_id)
310 311 312
          transaction.get().note(
            'CMFActivity ' + '/'.join(self.object_path) + '/' + self.method_id
          )
313 314 315 316
          # Store site info
          setSite(activity_tool.getParentValue())
          if activity_tool.activity_timing_log:
            result = activity_timing_method(method, self.args, self.kw)
317
          else:
318
            result = method(*self.args, **self.kw)
319 320 321 322
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
323 324 325 326
          if self.active_process and result is not None:
            self.activateResult(
              activity_tool.unrestrictedTraverse(self.active_process),
              result, obj)
327
          self.setExecutionState(MESSAGE_EXECUTED)
328 329
    except:
      self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
330

331 332 333 334 335
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

336
  def notifyUser(self, activity_tool, retry=False):
337 338
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
339
    user_email = portal.getProperty('email_to_address',
340
                       portal.getProperty('email_from_address'))
341 342
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
343
    fail_count = self.line.retry + 1
344
    if retry:
345 346 347 348
      message = "Pending activity already failed %s times" % fail_count
    else:
      message = "Activity failed"
    path = '/'.join(self.object_path)
349
    mail_text = """From: %s <%s>
350
To: %s
351
Subject: %s: %s/%s
352

353
Node: %s
354
Failures: %s
355
User name: %r
356
Uid: %u
357 358
Document: %s
Method: %s
359 360
Arguments: %r
Named Parameters: %r
361 362
""" % (email_from_name, activity_tool.email_from_address, user_email, message,
       path, self.method_id, activity_tool.getCurrentNode(), fail_count,
363
       self.user_name, self.line.uid, path, self.method_id, self.args, self.kw)
364 365 366 367
    if self.traceback:
      mail_text += '\nException:\n' + self.traceback
    if self.call_traceback:
      mail_text += '\nCreated at:\n' + self.call_traceback
368
    try:
369
      portal.MailHost.send(mail_text)
Vincent Pelletier's avatar
Vincent Pelletier committed
370
    except (socket.error, MailHostError), message:
371 372
      LOG('ActivityTool.notifyUser', WARNING,
          'Mail containing failure information failed to be sent: %s' % message)
373

374
  def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
375
    # Reactivate the original object.
376
    obj = self._getObject(activity_tool)
377
    old_security_manager = getSecurityManager()
378
    try:
379 380 381
      # Change user if required (TO BE DONE)
      # We will change the user only in order to execute this method
      user = self.changeUser(self.user_name, activity_tool)
382
      active_obj = obj.activate(activity=activity, **self.activity_kw)
383 384 385
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
386
      setSecurityManager(old_security_manager)
387

388 389 390 391 392 393
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
394

395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
415
    if is_executed == MESSAGE_NOT_EXECUTED:
416
      if not exc_info:
417
        exc_info = sys.exc_info()
418 419
      if self.on_error_callback is not None:
        self.exc_info = exc_info
420 421
      self.exc_type = exc_info[0]
      if exc_info[0] is None:
422 423 424
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
          raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
425 426
        except Exception:
          exc_info = sys.exc_info()
427 428
      elif exc_info[0] is SkippedMessage:
        return
429 430 431 432 433
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
434
          error_log.raising(exc_info)
435
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info)[1:])
436 437 438 439

  def getExecutionState(self):
    return self.is_executed

440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
class GroupedMessage(object):
  __slots__ = 'object', '_message', 'result', 'exc_info'

  def __init__(self, object, message):
    self.object = object
    self._message = message

  args = property(lambda self: self._message.args)
  kw = property(lambda self: self._message.kw)

  def raised(self, exc_info=None):
    self.exc_info = exc_info or sys.exc_info()
    try:
      del self.result
    except AttributeError:
      pass

# XXX: Allowing restricted code to implement a grouping method is questionable
#      but there already exist some.
459
  __parent__ = property(lambda self: self.object) # for object
460
  _guarded_writes = 1 # for result
461
allow_class(GroupedMessage)
462 463 464

# Activity Registration
def activity_dict():
465
  from Activity import SQLDict, SQLQueue, SQLJoblib
466 467 468 469
  return {k: getattr(v, k)() for k, v in locals().iteritems()}
activity_dict = activity_dict()


Vincent Pelletier's avatar
Vincent Pelletier committed
470 471
class Method(object):
  __slots__ = (
472 473 474 475 476 477 478 479 480
    '_portal_activities',
    '_passive_url',
    '_passive_oid',
    '_activity',
    '_active_process',
    '_active_process_uid',
    '_kw',
    '_method_id',
    '_request',
Vincent Pelletier's avatar
Vincent Pelletier committed
481
  )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
482

483 484
  def __init__(self, portal_activities, passive_url, passive_oid, activity,
      active_process, active_process_uid, kw, method_id, request):
485 486 487 488 489 490 491 492 493
    self._portal_activities = portal_activities
    self._passive_url = passive_url
    self._passive_oid = passive_oid
    self._activity = activity
    self._active_process = active_process
    self._active_process_uid = active_process_uid
    self._kw = kw
    self._method_id = method_id
    self._request = request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
494 495

  def __call__(self, *args, **kw):
496
    portal_activities = self._portal_activities
497
    m = Message(
498 499 500 501 502 503
      url=self._passive_url,
      oid=self._passive_oid,
      active_process=self._active_process,
      active_process_uid=self._active_process_uid,
      activity_kw=self._kw,
      method_id=self._method_id,
504 505
      args=args,
      kw=kw,
506
      request=self._request,
507 508
      portal_activities=portal_activities,
    )
509
    portal_activities.getActivityBuffer().deferredQueueMessage(
510
      portal_activities, activity_dict[self._activity], m)
511 512
    if portal_activities.activity_tracking and m.is_registered:
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
513

514 515
allow_class(Method)

Vincent Pelletier's avatar
Vincent Pelletier committed
516 517 518
class ActiveWrapper(object):
  __slots__ = (
    '__portal_activities',
519 520
    '__passive_url',
    '__passive_oid',
Vincent Pelletier's avatar
Vincent Pelletier committed
521 522
    '__activity',
    '__active_process',
523
    '__active_process_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
524 525 526
    '__kw',
    '__request',
  )
527 528 529
  # Shortcut security lookup (avoid calling __getattr__)
  __parent__ = None

530 531
  def __init__(self, portal_activities, url, oid, activity, active_process,
      active_process_uid, kw, request):
532
    # second parameter can be an object or an object's path
533
    self.__portal_activities = portal_activities
534 535
    self.__passive_url = url
    self.__passive_oid = oid
536 537
    self.__activity = activity
    self.__active_process = active_process
538
    self.__active_process_uid = active_process_uid
539 540 541 542 543 544
    self.__kw = kw
    self.__request = request

  def __getattr__(self, name):
    return Method(
      self.__portal_activities,
545 546
      self.__passive_url,
      self.__passive_oid,
547 548
      self.__activity,
      self.__active_process,
549
      self.__active_process_uid,
550 551 552 553
      self.__kw,
      name,
      self.__request,
    )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
554

555
  def __repr__(self):
556 557
    return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self),
                                   self.__passive_url)
558

559 560 561
# True when activities cannot be executing any more.
has_processed_shutdown = False

562 563 564 565 566 567 568 569
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
570

571
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
572
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
573 574 575 576 577 578 579 580 581 582 583 584
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
585 586 587
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
588
    portal_type = 'Activity Tool'
589
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
590 591
    security = ClassSecurityInfo()

592 593
    isIndexable = False

594 595
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
596
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
597
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
598
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
599
                     ,
600
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
601 602 603 604

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

605 606 607
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

608 609
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
610

611 612
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
613

614 615
    distributingNode = ''
    _nodes = ()
616
    activity_creation_trace = False
617
    activity_tracking = False
618
    activity_timing_log = False
619
    cancel_and_invoke_links_hidden = False
620

621 622 623 624 625
    def SQLDict_setPriority(self, **kw):
      real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
      LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
      return real_SQLDict_setPriority(**kw)

626 627 628
    def __init__(self, id=None):
        if id is None:
          id = ActivityTool.id
629
        Folder.__init__(self, id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
630

631 632 633
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
634
        all = Folder.filtered_meta_types(self)
635 636 637 638 639 640
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

641 642 643 644 645 646 647 648 649 650 651 652 653 654
    def maybeMigrateConnectionClass(self):
      connection_id = 'cmf_activity_sql_connection'
      sql_connection = getattr(self, connection_id, None)
      if (sql_connection is not None and
          not isinstance(sql_connection, ActivityConnection)):
        # SQL Connection migration is needed
        LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
        parent = aq_parent(aq_inner(sql_connection))
        parent._delObject(sql_connection.getId())
        new_sql_connection = ActivityConnection(connection_id,
                                                sql_connection.title,
                                                sql_connection.connection_string)
        parent._setObject(connection_id, new_sql_connection)

655
    security.declarePrivate('initialize')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
656
    def initialize(self):
657
      self.maybeMigrateConnectionClass()
658 659
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=False)
660

661 662 663
    def _callSafeFunction(self, batch_function):
      return batch_function()

664 665
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
666 667 668 669 670 671
      """
      return True, if we are subscribed to TimerService.
      Otherwise return False.
      """
      service = getTimerService(self)
      if service:
672
        path = '/'.join(self.getPhysicalPath())
673 674 675
        return path in service.lisSubscriptions()
      LOG('ActivityTool', INFO, 'TimerService not available')
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
676

677
    security.declareProtected(Permissions.manage_properties, 'subscribe')
678
    def subscribe(self, REQUEST=None, RESPONSE=None):
679 680
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
681
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
682
        if not service:
683
            LOG('ActivityTool', INFO, 'TimerService not available')
684 685 686 687
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
688 689
        if RESPONSE is not None:
            RESPONSE.redirect(url)
690 691

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
692
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
693 694
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
695
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
696
        if not service:
697
            LOG('ActivityTool', INFO, 'TimerService not available')
698 699 700 701
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
702 703
        if RESPONSE is not None:
            RESPONSE.redirect(url)
704

705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace disabled')
          RESPONSE.redirect(url)

783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806
    security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
    def isCancelAndInvokeLinksHidden(self):
      return self.cancel_and_invoke_links_hidden

    security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
    def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links hidden')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
    def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links visible')
          RESPONSE.redirect(url)

807
    security.declarePrivate('manage_beforeDelete')
808 809
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
810
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
811 812
    
    security.declarePrivate('manage_afterAdd')
813 814
    def manage_afterAdd(self, item, container):
        self.subscribe()
815
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
816

817
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getServerAddress')
818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837
    def getServerAddress(self):
        """
        Backward-compatibility code only.
        """
        global _server_address
        if _server_address is None:
            ip = port = ''
            from asyncore import socket_map
            for k, v in socket_map.items():
                if hasattr(v, 'addr'):
                    # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                    type = str(getattr(v, '__class__', 'unknown'))
                    if type == 'ZServer.HTTPServer.zhttp_server':
                        ip, port = v.addr
                        break
            if ip == '0.0.0.0':
                ip = socket.gethostbyname(socket.gethostname())
            _server_address = '%s:%s' %(ip, port)
        return _server_address

838
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getCurrentNode')
839
    def getCurrentNode(self):
840
        """ Return current node identifier """
841 842
        global currentNode
        if currentNode is None:
843 844 845 846 847 848 849 850
          currentNode = getattr(
            getConfiguration(),
            'product_config',
            {},
          ).get('cmfactivity', {}).get('node-id')
        if currentNode is None:
          warnings.warn('Node name auto-generation is deprecated, please add a'
            '\n'
851
            '<product-config CMFActivity>\n'
852 853 854 855
            '  node-id = ...\n'
            '</product-config>\n'
            'section in your zope.conf, replacing "..." with a cluster-unique '
            'node identifier.', DeprecationWarning)
856
          currentNode = self.getServerAddress()
857
        return currentNode
858

859
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getDistributingNode')
860 861 862 863
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

864 865 866 867 868 869 870 871 872 873 874 875 876
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
877
        new_nodes.update([(x, ROLE_PROCESSING) for x in nodes])
878 879 880 881 882
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
883 884 885 886 887 888 889 890 891 892 893
      if node not in node_dict:
        if node_dict:
          # BBB: check if our node was known by address (processing and/or
          # distribution), and migrate it.
          server_address = self.getServerAddress()
          role = node_dict.pop(server_address, ROLE_IDLE)
          if self.distributingNode == server_address:
            self.distributingNode = node
        else:
          # We are registering the first node, make
          # it both the distributing node and a processing node.
894 895 896 897 898 899 900 901 902 903 904 905
          role = ROLE_PROCESSING
          self.distributingNode = node
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

906
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
907 908
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
909

910 911
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
912
      return isinstance(node_name, str)
913

914
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_setDistributingNode')
915
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
916
        """ set the distributing node """
917
        if not distributingNode or self._isValidNodeName(distributingNode):
918 919 920 921 922 923 924 925 926 927 928 929 930
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
987

988
    security.declarePrivate('process_shutdown')
989 990 991 992 993
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
994
        global has_processed_shutdown
995 996
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
997 998 999 1000
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

1001
    security.declareProtected(CMFCorePermissions.ManagePortal, 'process_timer')
1002
    def process_timer(self, tick, interval, prev="", next=""):
1003 1004 1005 1006 1007 1008 1009 1010
      """
      Call distribute() if we are the Distributing Node and call tic()
      with our node number.
      This method is called by TimerService in the interval given
      in zope.conf. The Default is every 5 seconds.
      """
      # Prevent TimerService from starting multiple threads in parallel
      if timerservice_lock.acquire(0):
1011
        try:
1012 1013 1014 1015 1016 1017
          # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
          # but on 2.2 it's by traversal, and our site probably wasn't traversed
          # by the timerserver request, which goes into the Zope Control_Panel
          # calling it a second time is a harmless and cheap no-op.
          # both setupCurrentSkin and REQUEST are acquired from containers.
          self.setupCurrentSkin(self.REQUEST)
1018
          old_sm = getSecurityManager()
1019
          try:
1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
            # get owner of portal_catalog, so normally we should be able to
            # have the permission to invoke all activities
            user = self.portal_catalog.getWrappedOwner()
            newSecurityManager(self.REQUEST, user)

            currentNode = self.getCurrentNode()
            self.registerNode(currentNode)
            processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

            # only distribute when we are the distributingNode
            if self.getDistributingNode() == currentNode:
              self.distribute(len(processing_node_list))

            # SkinsTool uses a REQUEST cache to store skin objects, as
            # with TimerService we have the same REQUEST over multiple
            # portals, we clear this cache to make sure the cache doesn't
            # contains skins from another portal.
1037
            try:
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
              self.getPortalObject().portal_skins.changeSkin(None)
            except AttributeError:
              pass

            # call tic for the current processing_node
            # the processing_node numbers are the indices of the elements
            # in the node tuple +1 because processing_node starts form 1
            if currentNode in processing_node_list:
              self.tic(processing_node_list.index(currentNode) + 1)
          except:
            # Catch ALL exception to avoid killing timerserver.
            LOG('ActivityTool', ERROR, 'process_timer received an exception',
                error=sys.exc_info())
1051 1052
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
1053
        finally:
1054
          timerservice_lock.release()
1055

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1056 1057 1058 1059 1060 1061
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Call distribute on each queue
1062
      for activity in activity_dict.itervalues():
1063
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1064

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1065
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1066
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1067 1068
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1069
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1070
      """
1071
      global active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1072 1073

      # return if the number of threads is too high
1074
      # else, increase the number of active_threads and continue
1075 1076
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
1077
      if not too_many_threads or force:
1078
        active_threads += 1
1079 1080 1081
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
1082
      tic_lock.release()
1083

1084
      inner_self = aq_inner(self)
1085

1086
      try:
1087
        # Loop as long as there are activities. Always process the queue with
1088 1089
        # "highest" priority. If several queues have same highest priority,
        # use a round-robin algorithm.
1090 1091
        # XXX: We always finish by iterating over all queues, in case that
        #      getPriority does not see messages dequeueMessage would process.
1092
        activity_list = activity_dict.values()
1093
        def sort_key(activity):
1094
          return activity.getPriority(self)
1095 1096
        while is_running_lock.acquire(0):
          try:
1097 1098
            activity_list.sort(key=sort_key) # stable sort
            for i, activity in enumerate(activity_list):
1099
              # Transaction processing is the responsability of the activity
1100 1101
              if not activity.dequeueMessage(inner_self, processing_node):
                activity_list.append(activity_list.pop(i))
1102 1103 1104 1105 1106
                break
            else:
              break
          finally:
            is_running_lock.release()
1107 1108 1109 1110 1111
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1112

1113
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1114
      # Check in each queue if the object has deferred tasks
1115 1116
      # if not argument is provided, then check on self
      if len(args) > 0:
1117
        obj = args[0]
1118
      else:
1119
        obj = self
1120
      for activity in activity_dict.itervalues():
1121
        if activity.hasActivity(aq_inner(self), obj, **kw):
1122 1123
          return True
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1124

1125
    security.declarePrivate('getActivityBuffer')
1126 1127 1128 1129 1130 1131 1132 1133
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
      """
1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
      # XXX: using a volatile attribute to cache getPhysicalPath result.
      # This cache may need invalidation if all the following is
      # simultaneously true:
      # - ActivityTool instances can be moved in object tree
      # - moved instance is used to get access to its activity buffer
      # - another instance is put in the place of the original, and used to
      #   access its activity buffer
      # ...which seems currently unlikely, and as such is left out.
      try:
        my_instance_key = self._v_physical_path
      except AttributeError:
        # Safeguard: make sure we are wrapped in acquisition context before
        # using our path as an activity tool instance-wide identifier.
        assert getattr(self, 'aq_self', None) is not None
        self._v_physical_path = my_instance_key = self.getPhysicalPath()
1149
      thread_activity_buffer = global_activity_buffer[my_instance_key]
1150
      my_thread_key = get_ident()
1151 1152 1153
      try:
        return thread_activity_buffer[my_thread_key]
      except KeyError:
1154
        if create_if_not_found:
1155
          buffer = ActivityBuffer()
1156 1157 1158
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
1159
        return buffer
1160

1161
    def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw):
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
      if active_process is None:
        active_process_uid = None
      elif isinstance(active_process, str):
        # TODO: deprecate
        active_process_uid = self.unrestrictedTraverse(active_process).getUid()
      else:
        active_process_uid = active_process.getUid()
        active_process = active_process.getPhysicalPath()
      if isinstance(object, str):
        oid = None
        url = tuple(object.split('/'))
      else:
        try:
          oid = aq_base(object)._p_oid
          # Note that it's too early to get the OID of a newly created object,
          # so at this point, self.oid may still be None.
        except AttributeError:
          pass
        url = object.getPhysicalPath()
      if kw.get('serialization_tag', False) is None:
        del kw['serialization_tag']
      return ActiveWrapper(self, url, oid, activity,
                           active_process, active_process_uid, kw,
1185
                           getattr(self, 'REQUEST', None))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1186

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1187
    def getRegisteredMessageList(self, activity):
1188
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1189
      if activity_buffer is not None:
1190 1191
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1192
                                                 aq_inner(self))
1193 1194
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1195

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1196
    def unregisterMessage(self, activity, message):
1197 1198 1199
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1200

1201
    def flush(self, obj, invoke=0, **kw):
1202
      self.getActivityBuffer()
1203 1204
      if isinstance(obj, tuple):
        object_path = obj
1205
      else:
1206
        object_path = obj.getPhysicalPath()
1207
      for activity in activity_dict.itervalues():
1208
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1209 1210

    def invoke(self, message):
1211
      if self.activity_tracking:
1212
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1213
      old_localizer_context = False
1214 1215
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1216
        base_chain = [aq_base(x) for x in self.aq_chain]
1217 1218 1219
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1220 1221 1222 1223 1224 1225 1226 1227
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1228 1229
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1230 1231
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1232
        # XXX: PATH_INFO might not be set when runing unit tests.
1233
        if request.environ.get('PATH_INFO') is None:
1234
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1235

1236 1237
        # restore request information
        new_request = request.clone()
1238
        request_info = message.request_info
1239 1240
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1241 1242
        if '_script' in request_info:
          new_request._script = request_info['_script']
1243
        if 'SERVER_URL' in request_info:
1244
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1245 1246 1247
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1248
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1249 1250
          # Replace Localizer/iHotfix Context, saving existing one
          localizer_context = LocalizerContext(new_request)
1251
          id = get_ident()
1252
          localizer_lock.acquire()
1253
          try:
1254 1255
            old_localizer_context = localizer_contexts.get(id)
            localizer_contexts[id] = localizer_context
1256
          finally:
1257 1258
            localizer_lock.release()
          # Execute Localizer/iHotfix "patch 2"
1259
          new_request.processInputs()
1260 1261

        new_request_container = request_container.__class__(REQUEST=new_request)
1262 1263 1264 1265 1266 1267 1268 1269
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1270 1271 1272
      try:
        message(my_self)
      finally:
1273 1274 1275 1276
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1277 1278
        if old_localizer_context is not False:
          # Restore Localizer/iHotfix context
1279
          id = get_ident()
1280
          localizer_lock.acquire()
1281
          try:
1282 1283
            if old_localizer_context is None:
              del localizer_contexts[id]
1284
            else:
1285
              localizer_contexts[id] = old_localizer_context
1286
          finally:
1287
            localizer_lock.release()
1288
      if self.activity_tracking:
1289
        activity_tracking_logger.info('invoked message')
1290 1291 1292
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1293

1294
    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1295
      if self.activity_tracking:
1296 1297 1298
        activity_tracking_logger.info(
          'invoking group messages: method_id=%s, paths=%s'
          % (method_id, ['/'.join(m.object_path) for m in message_list]))
1299
      # Invoke a group method.
1300
      message_dict = {}
1301
      path_set = set()
1302 1303 1304
      # Filter the list of messages. If an object is not available, mark its
      # message as non-executable. In addition, expand an object if necessary,
      # and make sure that no duplication happens.
1305
      for m in message_list:
1306 1307
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1308
        try:
1309 1310 1311
          object_list = m.getObjectList(self)
          if object_list is None:
            continue
1312
          message_dict[m] = expanded_object_list = []
1313
          for subobj in object_list:
1314 1315 1316 1317
            if merge_duplicate:
              path = subobj.getPath()
              if path in path_set:
                continue
1318
              path_set.add(path)
1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
            if alternate_method_id is not None \
               and hasattr(aq_base(subobj), alternate_method_id):
              # if this object is alternated,
              # generate a new single active object
              activity_kw = m.activity_kw.copy()
              activity_kw.pop('group_method_id', None)
              activity_kw.pop('group_id', None)
              active_obj = subobj.activate(activity=activity, **activity_kw)
              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
            else:
1329
              expanded_object_list.append(GroupedMessage(subobj, m))
1330
        except:
1331
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1332

1333
      expanded_object_list = sum(message_dict.itervalues(), [])
1334
      try:
1335
        if expanded_object_list:
1336 1337
          # Store site info
          setSite(self.getParentValue())
1338
          traverse = self.getPortalObject().unrestrictedTraverse
1339
          # FIXME: how to apply security here?
1340
          # NOTE: The callee must update each processed item of
1341 1342 1343 1344 1345
          #       expanded_object_list, by setting:
          #       - 'exc_info' in case of error
          #       - 'result' otherwise, with None or the result to post
          #          on the active process
          #       Skipped item must not be touched.
1346
          traverse(method_id)(expanded_object_list)
1347 1348
      except:
        # In this case, the group method completely failed.
1349
        exc_info = sys.exc_info()
1350
        for m in message_dict:
1351
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1352
        LOG('WARNING ActivityTool', 0,
1353
            'Could not call method %s on objects %s' %
1354 1355
            (method_id, [x.object for x in expanded_object_list]),
            error=exc_info)
1356 1357 1358
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1359
      else:
1360 1361 1362 1363
        # Note there can be partial failures.
        for m, expanded_object_list in message_dict.iteritems():
          result_list = []
          for result in expanded_object_list:
1364 1365 1366 1367 1368 1369
            try:
              if result.result is not None:
                result_list.append(result)
            except AttributeError:
              exc_info = getattr(result, "exc_info", (SkippedMessage,))
              break # failed or skipped message
1370 1371
          else:
            try:
1372 1373 1374
              if result_list and m.active_process:
                active_process = traverse(m.active_process)
                for result in result_list:
1375
                  m.activateResult(active_process, result.result, result.object)
1376
            except:
1377
              exc_info = None
1378
            else:
1379
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1380
              continue
1381
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, context=self)
1382
      if self.activity_tracking:
1383
        activity_tracking_logger.info('invoked group messages')
1384

1385 1386 1387 1388
    security.declarePrivate('dummyGroupMethod')
    class dummyGroupMethod(object):
      def __bobo_traverse__(self, REQUEST, method_id):
        def group_method(message_list):
1389 1390
          user_name = None
          sm = getSecurityManager()
1391 1392
          try:
            for m in message_list:
1393 1394 1395 1396
              message = m._message
              if user_name != message.user_name:
                user_name = message.user_name
                message.changeUser(user_name, m.object)
1397
              m.result = getattr(m.object, method_id)(*m.args, **m.kw)
1398
          except Exception:
1399
            m.raised()
1400 1401
          finally:
            setSecurityManager(sm)
1402 1403 1404
        return group_method
    dummyGroupMethod = dummyGroupMethod()

1405 1406
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1407
      # Some Security Cheking should be made here XXX
1408
      self.getActivityBuffer()
1409
      activity_dict[activity].queueMessage(aq_inner(self),
1410 1411
        Message(path, active_process, activity_kw, method_id, args, kw,
          portal_activities=self))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1412

1413
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1414 1415 1416 1417 1418 1419
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1420
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1421
      if REQUEST is not None:
1422 1423
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1424

1425
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageRestart')
1426 1427 1428 1429
    def manageRestart(self, message_uid_list, activity, REQUEST=None):
      """
        Restart one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1430 1431 1432 1433
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
1434 1435 1436 1437
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))

1438
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      LOG('ActivityTool', WARNING,
          '"manageCancel" method is deprecated, use "manageDelete" instead.')
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      self.flush(object_path,method_id=method_id,invoke=0)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
1450
          self.absolute_url(), 'manageActivities'))
1451 1452 1453 1454 1455 1456

    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageDelete' )
    def manageDelete(self, message_uid_list, activity, REQUEST=None):
      """
        Delete one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1457 1458 1459 1460
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_delMessage(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1461
      if REQUEST is not None:
1462 1463
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1464

1465 1466
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1467
    def manageClearActivities(self, keep=1, RESPONSE=None):
1468
      """
1469
        Recreate tables, clearing all activities
1470
      """
1471 1472
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=True)
1473

1474 1475 1476
      if RESPONSE is not None:
        return RESPONSE.redirect(self.absolute_url_path() +
          '/manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared')
1477

1478 1479 1480 1481 1482 1483 1484 1485 1486
    security.declarePublic('getMessageTempObjectList')
    def getMessageTempObjectList(self, **kw):
      """
        Get object list of messages waiting in queues
      """
      message_list = self.getMessageList(**kw)
      object_list = []
      for sql_message in message_list:
        message = self.newContent(temp_object=1)
1487
        message.__dict__.update(**sql_message.__dict__)
1488 1489 1490
        object_list.append(message)
      return object_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1491
    security.declarePublic('getMessageList')
1492
    def getMessageList(self, activity=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1493 1494 1495
      """
        List messages waiting in queues
      """
1496 1497
      if activity:
        return activity_dict[activity].getMessageList(aq_inner(self), **kw)
1498

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1499
      message_list = []
1500
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1501
        try:
1502
          message_list += activity.getMessageList(aq_inner(self), **kw)
Sebastien Robin's avatar
Sebastien Robin committed
1503 1504
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1505 1506
      return message_list

1507 1508 1509 1510 1511 1512
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1513
      for activity in activity_dict.itervalues():
1514
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1515 1516 1517 1518 1519 1520 1521 1522 1523 1524
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1525
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1526 1527 1528 1529
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1530
      for activity in activity_dict.itervalues():
1531
        message_count += activity.countMessage(aq_inner(self), **kw)
1532 1533
      return message_count

1534
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1535
    def newActiveProcess(self, REQUEST=None, **kw):
1536 1537
      # note: if one wants to create an Actice Process without ERP5 products,
      # she can call ActiveProcess.addActiveProcess
1538
      obj = self.newContent(portal_type="Active Process", **kw)
1539 1540 1541
      if REQUEST is not None:
        REQUEST['RESPONSE'].redirect( 'manage_main' )
      return obj
1542

1543
    # Active synchronisation methods
1544
    security.declarePrivate('validateOrder')
1545
    def validateOrder(self, message, validator_id, validation_value):
1546 1547 1548 1549 1550 1551
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
      message_list = []
1552
      method_id = "_validate_" + validator_id
1553
      for activity in activity_dict.itervalues():
1554 1555 1556 1557
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
1558
            message_list += [(activity, m) for m in result]
1559
      return message_list
1560

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1561 1562
    # Required for tests (time shift)
    def timeShift(self, delay):
1563
      for activity in activity_dict.itervalues():
1564
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1565

1566
InitializeClass(ActivityTool)