ActivityTool.py 61.1 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34
from collections import defaultdict
35
from cPickle import dumps, loads
36
from Products.CMFCore import permissions as CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
40
from Products.CMFActivity.ActivityConnection import ActivityConnection
41
from Products.PythonScripts.Utility import allow_class
42
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
43 44 45 46
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
47
from AccessControl.User import system as system_user
48
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser
49
from Products.ERP5Type.Globals import InitializeClass, DTMLFile
50
from Acquisition import aq_base, aq_inner, aq_parent
51
from ActivityBuffer import ActivityBuffer
52
from ActivityRuntimeEnvironment import BaseMessage
53
from zExceptions import ExceptionFormatter
54
from BTrees.OIBTree import OIBTree
55 56
from Zope2 import app
from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
57
from zope.site.hooks import setSite
58
import transaction
59
from App.config import getConfiguration
60

61 62 63 64
import Products.Localizer.patches
localizer_lock = Products.Localizer.patches._requests_lock
localizer_contexts = Products.Localizer.patches._requests
LocalizerContext = lambda request: request
65

66

67
from ZODB.POSException import ConflictError
68
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69

70
from zLOG import LOG, INFO, WARNING, ERROR
71
import warnings
72
from time import time
73 74

try:
75
  from Products.TimerService import getTimerService
76
except ImportError:
77 78
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80
from traceback import format_list, extract_stack
81

Jean-Paul Smets's avatar
Jean-Paul Smets committed
82 83 84 85
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
86 87
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
88
is_running_lock = threading.Lock()
89
currentNode = None
90
_server_address = None
91 92
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93

94 95 96 97 98
# Logging channel definitions
import logging
# Main logging channel
activity_logger = logging.getLogger('CMFActivity')
# Some logging subchannels
99
activity_tracking_logger = logging.getLogger('Tracking')
100
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
101 102 103 104 105 106 107 108 109 110

# Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
# Otherwise, it will end up in root logging facility (ie, event.log).
from App.config import getConfiguration
import os
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = os.path.join(instancehome, 'log')
  if os.path.isdir(log_directory):
    from Signals import Signals
111 112
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
113 114 115 116 117 118 119 120
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
    Signals.registerZopeSignals([log_file_handler])
    activity_logger.addHandler(log_file_handler)
    activity_logger.propagate = 0

121 122 123 124 125 126 127 128
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

129 130 131
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
132 133
global_activity_buffer = defaultdict(dict)
from thread import get_ident
134

135 136 137 138
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

139

140 141 142 143
class SkippedMessage(Exception):
  pass


144
class Message(BaseMessage):
145
  """Activity Message Class.
146

147 148
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
149

150
  active_process = None
151
  active_process_uid = None
152
  call_traceback = None
153
  exc_info = None
154 155 156
  is_executed = MESSAGE_NOT_EXECUTED
  processing = None
  traceback = None
157
  oid = None
158
  is_registered = False
159

160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
  def __init__(
      self,
      url,
      oid,
      active_process,
      active_process_uid,
      activity_kw,
      method_id,
      args, kw,
      request=None,
      portal_activities=None,
    ):
    self.object_path = url
    self.oid = oid
    self.active_process = active_process
    self.active_process_uid = active_process_uid
Jean-Paul Smets's avatar
Jean-Paul Smets committed
176 177 178 179
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
180
    if getattr(portal_activities, 'activity_creation_trace', False):
181 182 183 184
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
185
    self.user_name = str(_getAuthenticatedUser(self))
186
    # Store REQUEST Info
187
    self.request_info = {}
188
    if request is not None:
189 190 191 192 193 194 195 196 197
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198

199 200 201 202 203 204 205 206
  @staticmethod
  def load(s, **kw):
    self = loads(s)
    self.__dict__.update(kw)
    return self

  dump = dumps

207 208 209 210 211 212 213
  def getGroupId(self):
    get = self.activity_kw.get
    group_method_id = get('group_method_id', '')
    if group_method_id is None:
      group_method_id = 'portal_activities/dummyGroupMethod/' + self.method_id
    return group_method_id + '\0' + get('group_id', '')

214 215 216 217 218 219
  def _getObject(self, activity_tool):
    obj = activity_tool.getPhysicalRoot()
    for id in self.object_path[1:]:
      obj = obj[id]
    return obj

220
  def getObject(self, activity_tool):
221
    """return the object referenced in this message."""
222
    try:
223
      obj = self._getObject(activity_tool)
224
    except KeyError:
225 226 227
      LOG('CMFActivity', WARNING, "Message dropped (no object found at path %r)"
          % (self.object_path,), error=sys.exc_info())
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE)
228
    else:
229 230 231
      if (self.oid and self.oid != getattr(aq_base(obj), '_p_oid', None) and
          # XXX: BusinessTemplate must be fixed to preserve OID
          'portal_workflow' not in self.object_path):
232 233 234 235 236
        raise ValueError("OID mismatch for %r" % obj)
      return obj

  def getObjectList(self, activity_tool):
    """return the list of object that can be expanded from this message
237 238 239 240
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
241 242 243 244 245 246 247 248 249 250
    obj = self.getObject(activity_tool)
    if obj is None:
      return ()
    if 'expand_method_id' in self.activity_kw:
      return getattr(obj, self.activity_kw['expand_method_id'])()
    return obj,

  def getObjectCount(self, activity_tool):
    if 'expand_method_id' in self.activity_kw:
      try:
251
        obj = self._getObject(activity_tool)
252 253 254 255
        return len(getattr(obj, self.activity_kw['expand_method_id'])())
      except StandardError:
        pass
    return 1
256

257
  def changeUser(self, user_name, activity_tool):
258
    """restore the security context for the calling user."""
259 260 261
    portal = activity_tool.getPortalObject()
    portal_uf = portal.acl_users
    uf = portal_uf
262
    user = uf.getUserById(user_name)
263
    # if the user is not found, try to get it from a parent acl_users
264
    # XXX this is still far from perfect, because we need to store all
265
    # information about the user (like original user folder, roles) to
266 267
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
268
    if user is None:
269
      uf = portal.aq_parent.acl_users
270
      user = uf.getUserById(user_name)
271
    if user is None and user_name == system_user.getUserName():
272 273 274 275
      # The following logic partly comes from unrestricted_apply()
      # implementation in ERP5Type.UnrestrictedMethod but we get roles
      # from the portal to have more roles.
      uf = portal_uf
276 277
      role_list = uf.valid_roles()
      user = PrivilegedUser(user_name, None, role_list, ()).__of__(uf)
278 279 280
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
281
      transaction.get().setUser(user_name, '/'.join(uf.getPhysicalPath()))
282
    else :
283
      LOG("CMFActivity", WARNING,
284
          "Unable to find user %r in the portal" % user_name)
285
      noSecurityManager()
286 287
    return user

288 289 290 291 292
  def activateResult(self, active_process, result, object):
    if not isinstance(result, ActiveResult):
      result = ActiveResult(result=result)
    # XXX Allow other method_id in future
    result.edit(object_path=object, method_id=self.method_id)
293
    active_process.postResult(result)
294

Jean-Paul Smets's avatar
Jean-Paul Smets committed
295
  def __call__(self, activity_tool):
296
    try:
297
      obj = self.getObject(activity_tool)
298
      if obj is not None:
299 300
        old_security_manager = getSecurityManager()
        try:
301 302 303
          # Change user if required (TO BE DONE)
          # We will change the user only in order to execute this method
          self.changeUser(self.user_name, activity_tool)
304 305 306
          # XXX: There is no check to see if user is allowed to access
          #      that method !
          method = getattr(obj, self.method_id)
307 308 309
          transaction.get().note(
            'CMFActivity ' + '/'.join(self.object_path) + '/' + self.method_id
          )
310 311 312 313
          # Store site info
          setSite(activity_tool.getParentValue())
          if activity_tool.activity_timing_log:
            result = activity_timing_method(method, self.args, self.kw)
314
          else:
315
            result = method(*self.args, **self.kw)
316 317 318 319
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
320 321 322 323
          if self.active_process and result is not None:
            self.activateResult(
              activity_tool.unrestrictedTraverse(self.active_process),
              result, obj)
324
          self.setExecutionState(MESSAGE_EXECUTED)
325 326
    except:
      self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
327

328 329 330 331 332
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

333
  def notifyUser(self, activity_tool, retry=False):
334 335
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
336
    user_email = portal.getProperty('email_to_address',
337
                       portal.getProperty('email_from_address'))
338 339
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
340
    fail_count = self.line.retry + 1
341
    if retry:
342 343 344 345
      message = "Pending activity already failed %s times" % fail_count
    else:
      message = "Activity failed"
    path = '/'.join(self.object_path)
346
    mail_text = """From: %s <%s>
347
To: %s
348
Subject: %s: %s/%s
349

350
Node: %s
351
Failures: %s
352
User name: %r
353
Uid: %u
354 355
Document: %s
Method: %s
356 357
Arguments: %r
Named Parameters: %r
358 359
""" % (email_from_name, activity_tool.email_from_address, user_email, message,
       path, self.method_id, activity_tool.getCurrentNode(), fail_count,
360
       self.user_name, self.line.uid, path, self.method_id, self.args, self.kw)
361 362 363 364
    if self.traceback:
      mail_text += '\nException:\n' + self.traceback
    if self.call_traceback:
      mail_text += '\nCreated at:\n' + self.call_traceback
365
    try:
366
      portal.MailHost.send(mail_text)
Vincent Pelletier's avatar
Vincent Pelletier committed
367
    except (socket.error, MailHostError), message:
368 369
      LOG('ActivityTool.notifyUser', WARNING,
          'Mail containing failure information failed to be sent: %s' % message)
370

371
  def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
372
    # Reactivate the original object.
373
    obj = self._getObject(activity_tool)
374
    old_security_manager = getSecurityManager()
375
    try:
376 377 378
      # Change user if required (TO BE DONE)
      # We will change the user only in order to execute this method
      user = self.changeUser(self.user_name, activity_tool)
379
      active_obj = obj.activate(activity=activity, **self.activity_kw)
380 381 382
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
383
      setSecurityManager(old_security_manager)
384

385 386 387 388 389 390
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
391

392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
412
    if is_executed == MESSAGE_NOT_EXECUTED:
413
      if not exc_info:
414
        exc_info = sys.exc_info()
415 416
      if self.on_error_callback is not None:
        self.exc_info = exc_info
417 418
      self.exc_type = exc_info[0]
      if exc_info[0] is None:
419 420 421
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
          raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
422 423
        except Exception:
          exc_info = sys.exc_info()
424 425
      elif exc_info[0] is SkippedMessage:
        return
426 427 428 429 430
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
431
          error_log.raising(exc_info)
432
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info)[1:])
433 434 435 436

  def getExecutionState(self):
    return self.is_executed

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
class GroupedMessage(object):
  __slots__ = 'object', '_message', 'result', 'exc_info'

  def __init__(self, object, message):
    self.object = object
    self._message = message

  args = property(lambda self: self._message.args)
  kw = property(lambda self: self._message.kw)

  def raised(self, exc_info=None):
    self.exc_info = exc_info or sys.exc_info()
    try:
      del self.result
    except AttributeError:
      pass

# XXX: Allowing restricted code to implement a grouping method is questionable
#      but there already exist some.
456
  _guarded_writes = 1 # for result
457
allow_class(GroupedMessage)
458 459 460 461 462 463 464 465

# Activity Registration
def activity_dict():
  from Activity import SQLDict, SQLQueue
  return {k: getattr(v, k)() for k, v in locals().iteritems()}
activity_dict = activity_dict()


Vincent Pelletier's avatar
Vincent Pelletier committed
466 467
class Method(object):
  __slots__ = (
468 469 470 471 472 473 474 475 476
    '_portal_activities',
    '_passive_url',
    '_passive_oid',
    '_activity',
    '_active_process',
    '_active_process_uid',
    '_kw',
    '_method_id',
    '_request',
Vincent Pelletier's avatar
Vincent Pelletier committed
477
  )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
478

479 480
  def __init__(self, portal_activities, passive_url, passive_oid, activity,
      active_process, active_process_uid, kw, method_id, request):
481 482 483 484 485 486 487 488 489
    self._portal_activities = portal_activities
    self._passive_url = passive_url
    self._passive_oid = passive_oid
    self._activity = activity
    self._active_process = active_process
    self._active_process_uid = active_process_uid
    self._kw = kw
    self._method_id = method_id
    self._request = request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
490 491

  def __call__(self, *args, **kw):
492
    portal_activities = self._portal_activities
493
    m = Message(
494 495 496 497 498 499
      url=self._passive_url,
      oid=self._passive_oid,
      active_process=self._active_process,
      active_process_uid=self._active_process_uid,
      activity_kw=self._kw,
      method_id=self._method_id,
500 501
      args=args,
      kw=kw,
502
      request=self._request,
503 504
      portal_activities=portal_activities,
    )
505
    if portal_activities.activity_tracking:
506
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self._activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
507
    portal_activities.getActivityBuffer().deferredQueueMessage(
508
      portal_activities, activity_dict[self._activity], m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
509

510 511
allow_class(Method)

Vincent Pelletier's avatar
Vincent Pelletier committed
512 513 514
class ActiveWrapper(object):
  __slots__ = (
    '__portal_activities',
515 516
    '__passive_url',
    '__passive_oid',
Vincent Pelletier's avatar
Vincent Pelletier committed
517 518
    '__activity',
    '__active_process',
519
    '__active_process_uid',
Vincent Pelletier's avatar
Vincent Pelletier committed
520 521 522
    '__kw',
    '__request',
  )
523 524 525
  # Shortcut security lookup (avoid calling __getattr__)
  __parent__ = None

526 527
  def __init__(self, portal_activities, url, oid, activity, active_process,
      active_process_uid, kw, request):
528
    # second parameter can be an object or an object's path
529
    self.__portal_activities = portal_activities
530 531
    self.__passive_url = url
    self.__passive_oid = oid
532 533
    self.__activity = activity
    self.__active_process = active_process
534
    self.__active_process_uid = active_process_uid
535 536 537 538 539 540
    self.__kw = kw
    self.__request = request

  def __getattr__(self, name):
    return Method(
      self.__portal_activities,
541 542
      self.__passive_url,
      self.__passive_oid,
543 544
      self.__activity,
      self.__active_process,
545
      self.__active_process_uid,
546 547 548 549
      self.__kw,
      name,
      self.__request,
    )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
550

551
  def __repr__(self):
552 553
    return '<%s at 0x%x to %s>' % (self.__class__.__name__, id(self),
                                   self.__passive_url)
554

555 556 557
# True when activities cannot be executing any more.
has_processed_shutdown = False

558 559 560 561 562 563 564 565
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
566

567
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
568
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
569 570 571 572 573 574 575 576 577 578 579 580
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
581 582 583
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
584
    portal_type = 'Activity Tool'
585
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
586 587
    security = ClassSecurityInfo()

588 589
    isIndexable = False

590 591
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
592
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
593
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
594
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
595
                     ,
596
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
597 598 599 600

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

601 602 603
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

604 605
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
606

607 608
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
609

610 611
    distributingNode = ''
    _nodes = ()
612
    activity_creation_trace = False
613
    activity_tracking = False
614
    activity_timing_log = False
615
    cancel_and_invoke_links_hidden = False
616

617 618 619 620 621
    def SQLDict_setPriority(self, **kw):
      real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
      LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
      return real_SQLDict_setPriority(**kw)

622 623 624 625
    def __init__(self, id=None):
        if id is None:
          id = ActivityTool.id
        return Folder.__init__(self, id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
626

627 628 629
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
630
        all = Folder.filtered_meta_types(self)
631 632 633 634 635 636
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

637 638 639 640 641 642 643 644 645 646 647 648 649 650
    def maybeMigrateConnectionClass(self):
      connection_id = 'cmf_activity_sql_connection'
      sql_connection = getattr(self, connection_id, None)
      if (sql_connection is not None and
          not isinstance(sql_connection, ActivityConnection)):
        # SQL Connection migration is needed
        LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
        parent = aq_parent(aq_inner(sql_connection))
        parent._delObject(sql_connection.getId())
        new_sql_connection = ActivityConnection(connection_id,
                                                sql_connection.title,
                                                sql_connection.connection_string)
        parent._setObject(connection_id, new_sql_connection)

651
    security.declarePrivate('initialize')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
652
    def initialize(self):
653
      self.maybeMigrateConnectionClass()
654 655
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=False)
656

657 658
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
659 660 661 662 663 664
      """
      return True, if we are subscribed to TimerService.
      Otherwise return False.
      """
      service = getTimerService(self)
      if service:
665
        path = '/'.join(self.getPhysicalPath())
666 667 668
        return path in service.lisSubscriptions()
      LOG('ActivityTool', INFO, 'TimerService not available')
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
669

670
    security.declareProtected(Permissions.manage_properties, 'subscribe')
671
    def subscribe(self, REQUEST=None, RESPONSE=None):
672 673
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
674
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
675
        if not service:
676
            LOG('ActivityTool', INFO, 'TimerService not available')
677 678 679 680
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
681 682
        if RESPONSE is not None:
            RESPONSE.redirect(url)
683 684

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
685
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
686 687
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
688
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
689
        if not service:
690
            LOG('ActivityTool', INFO, 'TimerService not available')
691 692 693 694
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
695 696
        if RESPONSE is not None:
            RESPONSE.redirect(url)
697

698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace disabled')
          RESPONSE.redirect(url)

776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
    security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
    def isCancelAndInvokeLinksHidden(self):
      return self.cancel_and_invoke_links_hidden

    security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
    def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links hidden')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
    def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links visible')
          RESPONSE.redirect(url)

800 801
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
802
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
803

804 805
    def manage_afterAdd(self, item, container):
        self.subscribe()
806
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827

    def getServerAddress(self):
        """
        Backward-compatibility code only.
        """
        global _server_address
        if _server_address is None:
            ip = port = ''
            from asyncore import socket_map
            for k, v in socket_map.items():
                if hasattr(v, 'addr'):
                    # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                    type = str(getattr(v, '__class__', 'unknown'))
                    if type == 'ZServer.HTTPServer.zhttp_server':
                        ip, port = v.addr
                        break
            if ip == '0.0.0.0':
                ip = socket.gethostbyname(socket.gethostname())
            _server_address = '%s:%s' %(ip, port)
        return _server_address

828
    def getCurrentNode(self):
829
        """ Return current node identifier """
830 831
        global currentNode
        if currentNode is None:
832 833 834 835 836 837 838 839
          currentNode = getattr(
            getConfiguration(),
            'product_config',
            {},
          ).get('cmfactivity', {}).get('node-id')
        if currentNode is None:
          warnings.warn('Node name auto-generation is deprecated, please add a'
            '\n'
840
            '<product-config CMFActivity>\n'
841 842 843 844
            '  node-id = ...\n'
            '</product-config>\n'
            'section in your zope.conf, replacing "..." with a cluster-unique '
            'node identifier.', DeprecationWarning)
845
          currentNode = self.getServerAddress()
846
        return currentNode
847

848 849 850 851 852
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

853 854 855 856 857 858 859 860 861 862 863 864 865
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
866
        new_nodes.update([(x, ROLE_PROCESSING) for x in nodes])
867 868 869 870 871
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
872 873 874 875 876 877 878 879 880 881 882
      if node not in node_dict:
        if node_dict:
          # BBB: check if our node was known by address (processing and/or
          # distribution), and migrate it.
          server_address = self.getServerAddress()
          role = node_dict.pop(server_address, ROLE_IDLE)
          if self.distributingNode == server_address:
            self.distributingNode = node
        else:
          # We are registering the first node, make
          # it both the distributing node and a processing node.
883 884 885 886 887 888 889 890 891 892 893 894
          role = ROLE_PROCESSING
          self.distributingNode = node
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

895
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
896 897
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
898

899 900
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
901
      return isinstance(node_name, str)
902

903 904
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
905
        """ set the distributing node """
906
        if not distributingNode or self._isValidNodeName(distributingNode):
907 908 909 910 911 912 913 914 915 916 917 918 919
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
976

977 978 979 980 981
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
982
        global has_processed_shutdown
983 984
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
985 986 987 988
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

989
    def process_timer(self, tick, interval, prev="", next=""):
990 991 992 993 994 995 996 997
      """
      Call distribute() if we are the Distributing Node and call tic()
      with our node number.
      This method is called by TimerService in the interval given
      in zope.conf. The Default is every 5 seconds.
      """
      # Prevent TimerService from starting multiple threads in parallel
      if timerservice_lock.acquire(0):
998
        try:
999 1000 1001 1002 1003 1004
          # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
          # but on 2.2 it's by traversal, and our site probably wasn't traversed
          # by the timerserver request, which goes into the Zope Control_Panel
          # calling it a second time is a harmless and cheap no-op.
          # both setupCurrentSkin and REQUEST are acquired from containers.
          self.setupCurrentSkin(self.REQUEST)
1005
          old_sm = getSecurityManager()
1006
          try:
1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
            # get owner of portal_catalog, so normally we should be able to
            # have the permission to invoke all activities
            user = self.portal_catalog.getWrappedOwner()
            newSecurityManager(self.REQUEST, user)

            currentNode = self.getCurrentNode()
            self.registerNode(currentNode)
            processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

            # only distribute when we are the distributingNode
            if self.getDistributingNode() == currentNode:
              self.distribute(len(processing_node_list))

            # SkinsTool uses a REQUEST cache to store skin objects, as
            # with TimerService we have the same REQUEST over multiple
            # portals, we clear this cache to make sure the cache doesn't
            # contains skins from another portal.
1024
            try:
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
              self.getPortalObject().portal_skins.changeSkin(None)
            except AttributeError:
              pass

            # call tic for the current processing_node
            # the processing_node numbers are the indices of the elements
            # in the node tuple +1 because processing_node starts form 1
            if currentNode in processing_node_list:
              self.tic(processing_node_list.index(currentNode) + 1)
          except:
            # Catch ALL exception to avoid killing timerserver.
            LOG('ActivityTool', ERROR, 'process_timer received an exception',
                error=sys.exc_info())
1038 1039
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
1040
        finally:
1041
          timerservice_lock.release()
1042

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1043 1044 1045 1046 1047 1048
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Call distribute on each queue
1049
      for activity in activity_dict.itervalues():
1050
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1051

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1052
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1053
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1054 1055
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1056
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1057
      """
1058
      global active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1059 1060

      # return if the number of threads is too high
1061
      # else, increase the number of active_threads and continue
1062 1063
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
1064
      if not too_many_threads or force:
1065
        active_threads += 1
1066 1067 1068
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
1069
      tic_lock.release()
1070

1071
      inner_self = aq_inner(self)
1072

1073
      try:
1074 1075 1076
        # Loop as long as there are activities. Always process the queue with
        # "highest" priority. If several queues have same highest priority, do
        # not choose one that has just been processed.
1077
        # This algorithm is fair enough because we only have 2 queues.
1078 1079 1080 1081 1082 1083 1084 1085
        # Otherwise, a round-robin of highest-priority queues would be required.
        # XXX: We always finish by iterating over all queues, in case that
        #      getPriority does not see messages dequeueMessage would process.
        last = None
        def sort_key(activity):
          return activity.getPriority(self), activity is last
        while is_running_lock.acquire(0):
          try:
1086
            for last in sorted(activity_dict.values(), key=sort_key):
1087 1088 1089 1090 1091 1092 1093
              # Transaction processing is the responsability of the activity
              if not last.dequeueMessage(inner_self, processing_node):
                break
            else:
              break
          finally:
            is_running_lock.release()
1094 1095 1096 1097 1098
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1099

1100
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1101
      # Check in each queue if the object has deferred tasks
1102 1103
      # if not argument is provided, then check on self
      if len(args) > 0:
1104
        obj = args[0]
1105
      else:
1106
        obj = self
1107
      for activity in activity_dict.itervalues():
1108
        if activity.hasActivity(aq_inner(self), obj, **kw):
1109 1110
          return True
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1111

1112 1113 1114 1115 1116 1117 1118 1119
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
      """
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
      # XXX: using a volatile attribute to cache getPhysicalPath result.
      # This cache may need invalidation if all the following is
      # simultaneously true:
      # - ActivityTool instances can be moved in object tree
      # - moved instance is used to get access to its activity buffer
      # - another instance is put in the place of the original, and used to
      #   access its activity buffer
      # ...which seems currently unlikely, and as such is left out.
      try:
        my_instance_key = self._v_physical_path
      except AttributeError:
        # Safeguard: make sure we are wrapped in acquisition context before
        # using our path as an activity tool instance-wide identifier.
        assert getattr(self, 'aq_self', None) is not None
        self._v_physical_path = my_instance_key = self.getPhysicalPath()
1135
      thread_activity_buffer = global_activity_buffer[my_instance_key]
1136
      my_thread_key = get_ident()
1137 1138 1139
      try:
        return thread_activity_buffer[my_thread_key]
      except KeyError:
1140
        if create_if_not_found:
1141
          buffer = ActivityBuffer()
1142 1143 1144
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
1145
        return buffer
1146

1147
    def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw):
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
      if active_process is None:
        active_process_uid = None
      elif isinstance(active_process, str):
        # TODO: deprecate
        active_process_uid = self.unrestrictedTraverse(active_process).getUid()
      else:
        active_process_uid = active_process.getUid()
        active_process = active_process.getPhysicalPath()
      if isinstance(object, str):
        oid = None
        url = tuple(object.split('/'))
      else:
        try:
          oid = aq_base(object)._p_oid
          # Note that it's too early to get the OID of a newly created object,
          # so at this point, self.oid may still be None.
        except AttributeError:
          pass
        url = object.getPhysicalPath()
      if kw.get('serialization_tag', False) is None:
        del kw['serialization_tag']
      return ActiveWrapper(self, url, oid, activity,
                           active_process, active_process_uid, kw,
1171
                           getattr(self, 'REQUEST', None))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1172

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1173
    def getRegisteredMessageList(self, activity):
1174
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1175
      if activity_buffer is not None:
1176 1177
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1178
                                                 aq_inner(self))
1179 1180
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1181

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1182
    def unregisterMessage(self, activity, message):
1183 1184 1185
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1186

1187
    def flush(self, obj, invoke=0, **kw):
1188
      self.getActivityBuffer()
1189 1190
      if isinstance(obj, tuple):
        object_path = obj
1191
      else:
1192
        object_path = obj.getPhysicalPath()
1193
      for activity in activity_dict.itervalues():
1194
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1195 1196

    def invoke(self, message):
1197
      if self.activity_tracking:
1198
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1199
      old_localizer_context = False
1200 1201
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1202
        base_chain = [aq_base(x) for x in self.aq_chain]
1203 1204 1205
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1206 1207 1208 1209 1210 1211 1212 1213
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1214 1215
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1216 1217
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1218
        # XXX: PATH_INFO might not be set when runing unit tests.
1219
        if request.environ.get('PATH_INFO') is None:
1220
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1221

1222 1223
        # restore request information
        new_request = request.clone()
1224
        request_info = message.request_info
1225 1226
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1227 1228
        if '_script' in request_info:
          new_request._script = request_info['_script']
1229
        if 'SERVER_URL' in request_info:
1230
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1231 1232 1233
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1234
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1235 1236
          # Replace Localizer/iHotfix Context, saving existing one
          localizer_context = LocalizerContext(new_request)
1237
          id = get_ident()
1238
          localizer_lock.acquire()
1239
          try:
1240 1241
            old_localizer_context = localizer_contexts.get(id)
            localizer_contexts[id] = localizer_context
1242
          finally:
1243 1244
            localizer_lock.release()
          # Execute Localizer/iHotfix "patch 2"
1245
          new_request.processInputs()
1246 1247

        new_request_container = request_container.__class__(REQUEST=new_request)
1248 1249 1250 1251 1252 1253 1254 1255
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1256 1257 1258
      try:
        message(my_self)
      finally:
1259 1260 1261 1262
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1263 1264
        if old_localizer_context is not False:
          # Restore Localizer/iHotfix context
1265
          id = get_ident()
1266
          localizer_lock.acquire()
1267
          try:
1268 1269
            if old_localizer_context is None:
              del localizer_contexts[id]
1270
            else:
1271
              localizer_contexts[id] = old_localizer_context
1272
          finally:
1273
            localizer_lock.release()
1274
      if self.activity_tracking:
1275
        activity_tracking_logger.info('invoked message')
1276 1277 1278
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1279

1280
    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1281
      if self.activity_tracking:
1282 1283 1284
        activity_tracking_logger.info(
          'invoking group messages: method_id=%s, paths=%s'
          % (method_id, ['/'.join(m.object_path) for m in message_list]))
1285
      # Invoke a group method.
1286
      message_dict = {}
1287
      path_set = set()
1288 1289 1290
      # Filter the list of messages. If an object is not available, mark its
      # message as non-executable. In addition, expand an object if necessary,
      # and make sure that no duplication happens.
1291
      for m in message_list:
1292 1293
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1294
        try:
1295 1296 1297
          object_list = m.getObjectList(self)
          if object_list is None:
            continue
1298
          message_dict[m] = expanded_object_list = []
1299
          for subobj in object_list:
1300 1301 1302 1303
            if merge_duplicate:
              path = subobj.getPath()
              if path in path_set:
                continue
1304
              path_set.add(path)
1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
            if alternate_method_id is not None \
               and hasattr(aq_base(subobj), alternate_method_id):
              # if this object is alternated,
              # generate a new single active object
              activity_kw = m.activity_kw.copy()
              activity_kw.pop('group_method_id', None)
              activity_kw.pop('group_id', None)
              active_obj = subobj.activate(activity=activity, **activity_kw)
              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
            else:
1315
              expanded_object_list.append(GroupedMessage(subobj, m))
1316
        except:
1317
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1318

1319
      expanded_object_list = sum(message_dict.itervalues(), [])
1320
      try:
1321
        if expanded_object_list:
1322
          traverse = self.getPortalObject().unrestrictedTraverse
1323
          # FIXME: how to apply security here?
1324
          # NOTE: The callee must update each processed item of
1325 1326 1327 1328 1329
          #       expanded_object_list, by setting:
          #       - 'exc_info' in case of error
          #       - 'result' otherwise, with None or the result to post
          #          on the active process
          #       Skipped item must not be touched.
1330
          traverse(method_id)(expanded_object_list)
1331 1332
      except:
        # In this case, the group method completely failed.
1333
        exc_info = sys.exc_info()
1334
        for m in message_dict:
1335
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1336
        LOG('WARNING ActivityTool', 0,
1337
            'Could not call method %s on objects %s' %
1338 1339
            (method_id, [x.object for x in expanded_object_list]),
            error=exc_info)
1340 1341 1342
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1343
      else:
1344 1345 1346 1347
        # Note there can be partial failures.
        for m, expanded_object_list in message_dict.iteritems():
          result_list = []
          for result in expanded_object_list:
1348 1349 1350 1351 1352 1353
            try:
              if result.result is not None:
                result_list.append(result)
            except AttributeError:
              exc_info = getattr(result, "exc_info", (SkippedMessage,))
              break # failed or skipped message
1354 1355
          else:
            try:
1356 1357 1358
              if result_list and m.active_process:
                active_process = traverse(m.active_process)
                for result in result_list:
1359
                  m.activateResult(active_process, result.result, result.object)
1360
            except:
1361
              exc_info = None
1362
            else:
1363
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1364
              continue
1365
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, context=self)
1366
      if self.activity_tracking:
1367
        activity_tracking_logger.info('invoked group messages')
1368

1369 1370 1371 1372
    security.declarePrivate('dummyGroupMethod')
    class dummyGroupMethod(object):
      def __bobo_traverse__(self, REQUEST, method_id):
        def group_method(message_list):
1373 1374
          user_name = None
          sm = getSecurityManager()
1375 1376
          try:
            for m in message_list:
1377 1378 1379 1380
              message = m._message
              if user_name != message.user_name:
                user_name = message.user_name
                message.changeUser(user_name, m.object)
1381
              m.result = getattr(m.object, method_id)(*m.args, **m.kw)
1382
          except Exception:
1383
            m.raised()
1384 1385
          finally:
            setSecurityManager(sm)
1386 1387 1388
        return group_method
    dummyGroupMethod = dummyGroupMethod()

1389 1390
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1391
      # Some Security Cheking should be made here XXX
1392
      self.getActivityBuffer()
1393
      activity_dict[activity].queueMessage(aq_inner(self),
1394 1395
        Message(path, active_process, activity_kw, method_id, args, kw,
          portal_activities=self))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1396

1397
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1398 1399 1400 1401 1402 1403
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1404
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1405
      if REQUEST is not None:
1406 1407
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1408

1409
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageRestart')
1410 1411 1412 1413
    def manageRestart(self, message_uid_list, activity, REQUEST=None):
      """
        Restart one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1414 1415 1416 1417
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
1418 1419 1420 1421
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))

1422
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      LOG('ActivityTool', WARNING,
          '"manageCancel" method is deprecated, use "manageDelete" instead.')
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      self.flush(object_path,method_id=method_id,invoke=0)
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (
1434
          self.absolute_url(), 'manageActivities'))
1435 1436 1437 1438 1439 1440

    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageDelete' )
    def manageDelete(self, message_uid_list, activity, REQUEST=None):
      """
        Delete one or several messages
      """
Sebastien Robin's avatar
Sebastien Robin committed
1441 1442 1443 1444
      if not(isinstance(message_uid_list, list)):
        message_uid_list = [message_uid_list]
      self.SQLBase_delMessage(table=activity_dict[activity].sql_table,
                              uid=message_uid_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1445
      if REQUEST is not None:
1446 1447
        return REQUEST.RESPONSE.redirect('%s/%s' % (
          self.absolute_url(), 'view'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1448

1449 1450
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1451
    def manageClearActivities(self, keep=1, RESPONSE=None):
1452
      """
1453
        Recreate tables, clearing all activities
1454
      """
1455 1456
      for activity in activity_dict.itervalues():
        activity.initialize(self, clear=True)
1457

1458 1459 1460
      if RESPONSE is not None:
        return RESPONSE.redirect(self.absolute_url_path() +
          '/manageActivitiesAdvanced?manage_tabs_message=Activities%20Cleared')
1461

1462 1463 1464 1465 1466 1467 1468 1469 1470
    security.declarePublic('getMessageTempObjectList')
    def getMessageTempObjectList(self, **kw):
      """
        Get object list of messages waiting in queues
      """
      message_list = self.getMessageList(**kw)
      object_list = []
      for sql_message in message_list:
        message = self.newContent(temp_object=1)
1471
        message.__dict__.update(**sql_message.__dict__)
1472 1473 1474
        object_list.append(message)
      return object_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1475
    security.declarePublic('getMessageList')
1476
    def getMessageList(self, activity=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1477 1478 1479
      """
        List messages waiting in queues
      """
1480 1481
      if activity:
        return activity_dict[activity].getMessageList(aq_inner(self), **kw)
1482

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1483
      message_list = []
1484
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1485
        try:
1486
          message_list += activity.getMessageList(aq_inner(self), **kw)
Sebastien Robin's avatar
Sebastien Robin committed
1487 1488
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1489 1490
      return message_list

1491 1492 1493 1494 1495 1496
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1497
      for activity in activity_dict.itervalues():
1498
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1509
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1510 1511 1512 1513
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1514
      for activity in activity_dict.itervalues():
1515
        message_count += activity.countMessage(aq_inner(self), **kw)
1516 1517
      return message_count

1518
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1519
    def newActiveProcess(self, REQUEST=None, **kw):
1520 1521
      # note: if one wants to create an Actice Process without ERP5 products,
      # she can call ActiveProcess.addActiveProcess
1522
      obj = self.newContent(portal_type="Active Process", **kw)
1523 1524 1525
      if REQUEST is not None:
        REQUEST['RESPONSE'].redirect( 'manage_main' )
      return obj
1526

1527
    # Active synchronisation methods
1528
    security.declarePrivate('validateOrder')
1529
    def validateOrder(self, message, validator_id, validation_value):
1530 1531 1532 1533 1534 1535
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
      message_list = []
1536
      method_id = "_validate_" + validator_id
1537
      for activity in activity_dict.itervalues():
1538 1539 1540 1541
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
1542
            message_list += [(activity, m) for m in result]
1543
      return message_list
1544

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1545 1546
    # Required for tests (time shift)
    def timeShift(self, delay):
1547
      for activity in activity_dict.itervalues():
1548
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1549

1550
InitializeClass(ActivityTool)