ActivityTool.py 58.9 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29 30 31 32
import socket
import urllib
import threading
import sys
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from types import StringType
34 35
import re

36
from Products.CMFCore import permissions as CMFCorePermissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37
from Products.ERP5Type.Core.Folder import Folder
38
from Products.CMFActivity.ActiveResult import ActiveResult
39
from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
40
from Products.CMFActivity.ActivityConnection import ActivityConnection
41
from Products.PythonScripts.Utility import allow_class
42
from AccessControl import ClassSecurityInfo, Permissions
Jérome Perrin's avatar
Jérome Perrin committed
43 44 45 46
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from AccessControl.SecurityManagement import getSecurityManager
47
from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser
48
from Products.ERP5Type.Globals import InitializeClass, DTMLFile
49
from Acquisition import aq_base, aq_inner, aq_parent
50
from ActivityBuffer import ActivityBuffer
51
from ActivityRuntimeEnvironment import BaseMessage
52
from zExceptions import ExceptionFormatter
53
from BTrees.OIBTree import OIBTree
54 55 56 57 58 59 60 61 62 63 64 65 66

try:
  from Products import iHotfix
  localizer_lock = iHotfix._the_lock
  localizer_contexts = iHotfix.contexts
  LocalizerContext = iHotfix.Context
except ImportError:
  # Localizer 1.2 includes iHotFix patches
  import Products.Localizer.patches
  localizer_lock = Products.Localizer.patches._requests_lock
  localizer_contexts = Products.Localizer.patches._requests
  LocalizerContext = lambda request: request

67

68
from ZODB.POSException import ConflictError
69
from Products.MailHost.MailHost import MailHostError
Jean-Paul Smets's avatar
Jean-Paul Smets committed
70

71
from zLOG import LOG, INFO, WARNING, ERROR
72
from warnings import warn
73
from time import time
74 75

try:
76
  from Products.TimerService import getTimerService
77
except ImportError:
78 79
  def getTimerService(self):
    pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
80

81
from traceback import format_list, extract_stack
82

83
# minimal IP:Port regexp
84
NODE_RE = re.compile('^\d+\.\d+\.\d+\.\d+:\d+$')
85

Jean-Paul Smets's avatar
Jean-Paul Smets committed
86 87 88 89
# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
Vincent Pelletier's avatar
Vincent Pelletier committed
90
is_initialized = False
91 92
tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
93
is_running_lock = threading.Lock()
94 95 96
currentNode = None
ROLE_IDLE = 0
ROLE_PROCESSING = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
97 98 99 100

# Activity Registration
activity_dict = {}

101 102 103 104 105
# Logging channel definitions
import logging
# Main logging channel
activity_logger = logging.getLogger('CMFActivity')
# Some logging subchannels
106
activity_tracking_logger = logging.getLogger('Tracking')
107
activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
108 109 110 111 112 113 114 115 116 117

# Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
# Otherwise, it will end up in root logging facility (ie, event.log).
from App.config import getConfiguration
import os
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = os.path.join(instancehome, 'log')
  if os.path.isdir(log_directory):
    from Signals import Signals
118 119
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
120 121 122 123 124 125 126 127
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
    log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
    Signals.registerZopeSignals([log_file_handler])
    activity_logger.addHandler(log_file_handler)
    activity_logger.propagate = 0

128 129 130 131 132 133 134 135
def activity_timing_method(method, args, kw):
  begin = time()
  try:
    return method(*args, **kw)
  finally:
    end = time()
    activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))

136 137 138 139 140 141 142
# Here go ActivityBuffer instances
# Structure:
#  global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
global_activity_buffer = {}
from thread import get_ident, allocate_lock
global_activity_buffer_lock = allocate_lock()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
143 144 145
def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
146
  #LOG('Init Activity', 0, str(activity.__name__))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
147 148 149
  activity_instance = activity()
  activity_dict[activity.__name__] = activity_instance

150 151 152 153
MESSAGE_NOT_EXECUTED = 0
MESSAGE_EXECUTED = 1
MESSAGE_NOT_EXECUTABLE = 2

154 155

class Message(BaseMessage):
156
  """Activity Message Class.
157

158 159
  Message instances are stored in an activity queue, inside the Activity Tool.
  """
160

161
  active_process = None
162 163
  active_process_uid = None

164 165
  def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
    if isinstance(obj, str):
166
      self.object_path = tuple(obj.split('/'))
167
      activity_creation_trace = False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
168
    else:
169
      self.object_path = obj.getPhysicalPath()
170
      activity_creation_trace = obj.getPortalObject().portal_activities.activity_creation_trace
171
    if active_process is not None:
172
      self.active_process = active_process.getPhysicalPath()
173
      self.active_process_uid = active_process.getUid()
174 175 176
    if activity_kw.get('serialization_tag', False) is None:
      # Remove serialization_tag if it's None.
      del activity_kw['serialization_tag']
Jean-Paul Smets's avatar
Jean-Paul Smets committed
177 178 179 180
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
181
    self.is_executed = MESSAGE_NOT_EXECUTED
Vincent Pelletier's avatar
Vincent Pelletier committed
182 183 184
    self.exc_type = None
    self.exc_value = None
    self.traceback = None
185
    if activity_creation_trace and format_list is not None:
186 187 188 189
      # Save current traceback, to make it possible to tell where a message
      # was generated.
      # Strip last stack entry, since it will always be the same.
      self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
190 191
    else:
      self.call_traceback = None
192
    self.processing = None
193
    self.user_name = str(_getAuthenticatedUser(self))
194
    # Store REQUEST Info
195
    self.request_info = {}
196 197
    request = getattr(obj, 'REQUEST', None)
    if request is not None:
198 199 200 201 202 203 204 205 206
      if 'SERVER_URL' in request.other:
        self.request_info['SERVER_URL'] = request.other['SERVER_URL']
      if 'VirtualRootPhysicalPath' in request.other:
        self.request_info['VirtualRootPhysicalPath'] = \
          request.other['VirtualRootPhysicalPath']
      if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
        self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
          request.environ['HTTP_ACCEPT_LANGUAGE']
      self.request_info['_script'] = list(request._script)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
207

208
  def getObject(self, activity_tool):
209
    """return the object referenced in this message."""
210
    return activity_tool.unrestrictedTraverse(self.object_path)
211

212
  def getObjectList(self, activity_tool):
213
    """return the list of object that can be expanded from this message."""
214
    object_list = []
215
    try:
216
      object_list.append(self.getObject(activity_tool))
217
    except KeyError:
218 219 220 221 222 223
      pass
    else:
      if self.hasExpandMethod():
        expand_method_id = self.activity_kw['expand_method_id']
        # FIXME: how to pass parameters?
        object_list = getattr(object_list[0], expand_method_id)()
224
    return object_list
225

226
  def hasExpandMethod(self):
227 228 229 230 231
    """return true if the message has an expand method.
    An expand method is used to expand the list of objects and to turn a
    big recursive transaction affecting many objects into multiple
    transactions affecting only one object at a time (this can prevent
    duplicated method calls)."""
232
    return self.activity_kw.has_key('expand_method_id')
233

234
  def changeUser(self, user_name, activity_tool):
235
    """restore the security context for the calling user."""
236 237
    uf = activity_tool.getPortalObject().acl_users
    user = uf.getUserById(user_name)
238
    # if the user is not found, try to get it from a parent acl_users
239 240 241 242
    # XXX this is still far from perfect, because we need to store all
    # informations about the user (like original user folder, roles) to
    # replay the activity with exactly the same security context as if
    # it had been executed without activity.
243 244 245
    if user is None:
      uf = activity_tool.getPortalObject().aq_parent.acl_users
      user = uf.getUserById(user_name)
246 247 248
    if user is not None:
      user = user.__of__(uf)
      newSecurityManager(None, user)
249
    else :
250
      LOG("CMFActivity", WARNING,
251
          "Unable to find user %r in the portal" % user_name)
252
      noSecurityManager()
253 254
    return user

255 256 257 258 259
  def activateResult(self, active_process, result, object):
    if not isinstance(result, ActiveResult):
      result = ActiveResult(result=result)
    # XXX Allow other method_id in future
    result.edit(object_path=object, method_id=self.method_id)
260
    active_process.postResult(result)
261

Jean-Paul Smets's avatar
Jean-Paul Smets committed
262
  def __call__(self, activity_tool):
263
    try:
264
      obj = self.getObject(activity_tool)
265
    except KeyError:
266 267 268 269
      LOG('CMFActivity', ERROR,
          'Message failed in getting an object from the path %r' % \
                  (self.object_path,),
          error=sys.exc_info())
270
      self.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=activity_tool)
271
    else:
272
      try:
273 274
        old_security_manager = getSecurityManager()
        try:
275 276 277
          # Change user if required (TO BE DONE)
          # We will change the user only in order to execute this method
          self.changeUser(self.user_name, activity_tool)
278 279 280
          try:
            # XXX: There is no check to see if user is allowed to access
            # that method !
281 282
            method = getattr(obj, self.method_id)
          except:
283 284 285 286
            LOG('CMFActivity', ERROR,
                'Message failed in getting a method %r from an object %r' % \
                       (self.method_id, obj,),
                error=sys.exc_info())
287
            method = None
288
            self.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=activity_tool)
289
          else:
290
            if activity_tool.activity_timing_log:
291 292 293
              result = activity_timing_method(method, self.args, self.kw)
            else:
              result = method(*self.args, **self.kw)
294 295 296 297
        finally:
          setSecurityManager(old_security_manager)

        if method is not None:
298 299 300 301
          if self.active_process and result is not None:
            self.activateResult(
              activity_tool.unrestrictedTraverse(self.active_process),
              result, obj)
302
          self.setExecutionState(MESSAGE_EXECUTED)
303
      except:
304
        self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
305

306 307 308 309 310 311 312
  def validate(self, activity, activity_tool, check_order_validation=1):
    return activity.validate(activity_tool, self,
                             check_order_validation=check_order_validation,
                             **self.activity_kw)

  def getDependentMessageList(self, activity, activity_tool):
    return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
313

314
  def notifyUser(self, activity_tool, retry=False):
315 316
    """Notify the user that the activity failed."""
    portal = activity_tool.getPortalObject()
317
    user_email = portal.getProperty('email_to_address',
318
                       portal.getProperty('email_from_address'))
319

320 321
    email_from_name = portal.getProperty('email_from_name',
                       portal.getProperty('email_from_address'))
322 323 324 325
    call_traceback = ''
    if self.call_traceback:
      call_traceback = 'Created at:\n%s' % self.call_traceback

326
    fail_count = self.line.retry + 1
Julien Muchembled's avatar
typo  
Julien Muchembled committed
327
    if self.getExecutionState() == MESSAGE_NOT_EXECUTABLE:
328 329
      message = "Not executable activity"
    elif retry:
330 331 332 333
      message = "Pending activity already failed %s times" % fail_count
    else:
      message = "Activity failed"
    path = '/'.join(self.object_path)
334
    mail_text = """From: %s <%s>
335
To: %s
336
Subject: %s: %s/%s
337

338
Node: %s
339
Failures: %s
340
User name: %r
341 342
Document: %s
Method: %s
343 344
Arguments: %r
Named Parameters: %r
345 346
%s

Vincent Pelletier's avatar
Vincent Pelletier committed
347
Exception: %s %s
348

349
%s
350 351 352 353
""" % (email_from_name, activity_tool.email_from_address, user_email,
       message, path, self.method_id,
       activity_tool.getCurrentNode(), fail_count,
       self.user_name, path, self.method_id, self.args, self.kw,
354
       call_traceback, self.exc_type, self.exc_value, self.traceback)
355

356 357
    try:
      activity_tool.MailHost.send( mail_text )
Vincent Pelletier's avatar
Vincent Pelletier committed
358 359
    except (socket.error, MailHostError), message:
      LOG('ActivityTool.notifyUser', WARNING, 'Mail containing failure information failed to be sent: %s. Exception was: %s %s\n%s' % (message, self.exc_type, self.exc_value, self.traceback))
360

361
  def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
362 363
    # Reactivate the original object.
    obj= self.getObject(activity_tool)
364
    old_security_manager = getSecurityManager()
365
    try:
366 367 368
      # Change user if required (TO BE DONE)
      # We will change the user only in order to execute this method
      user = self.changeUser(self.user_name, activity_tool)
369
      active_obj = obj.activate(activity=activity, **self.activity_kw)
370 371 372
      getattr(active_obj, self.method_id)(*self.args, **self.kw)
    finally:
      # Use again the previous user
373
      setSecurityManager(old_security_manager)
374

375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
  def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
    """
      Set message execution state.

      is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
      MESSAGE_NOT_EXECUTABLE (variables defined above).
      
      exc_info must be - if given - similar to sys.exc_info() return value.

      log must be - if given - True or False. If True, a log line will be
      emited with failure details. This parameter should only be used when
      invoking this method on a list of messages to avoid log flood. It is
      caller's responsability to output a log line summing up all errors, and
      to store error in Zope's error_log.

      context must be - if given - an object wrapped in acquisition context.
      It is used to access Zope's error_log object. It is not used if log is
      False.

      If given state is not MESSAGE_EXECUTED, it will also store given
      exc_info. If not given, it will extract one using sys.exc_info().
      If final exc_info does not contain any exception, current stack trace
      will be stored instead: it will hopefuly help understand why message
      is in an error state.
    """
    assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
    self.is_executed = is_executed
    if is_executed != MESSAGE_EXECUTED:
      if exc_info is None:
        exc_info = sys.exc_info()
405 406 407 408 409 410 411
      if exc_info == (None, None, None):
        # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
        try:
          raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
        except:
          pass
        exc_info = sys.exc_info()
412 413 414 415 416
      if log:
        LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
        # push the error in ZODB error_log
        error_log = getattr(context, 'error_log', None)
        if error_log is not None:
417
          error_log.raising(exc_info)
418 419
      self.exc_type = exc_info[0]
      self.exc_value = str(exc_info[1])
420
      self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info))
421 422 423 424

  def getExecutionState(self):
    return self.is_executed

Jean-Paul Smets's avatar
Jean-Paul Smets committed
425 426
class Method:

427
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
428 429
    self.__passive_self = passive_self
    self.__activity = activity
430
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
431 432 433 434
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
435
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
436
    portal_activities = self.__passive_self.getPortalObject().portal_activities
437
    if portal_activities.activity_tracking:
438
      activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self.__activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
439
    activity_dict[self.__activity].queueMessage(portal_activities, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
440

441 442
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
443
class ActiveWrapper:
444 445 446 447
  # XXX: maybe we should accept and forward an 'activity_tool' parameter,
  #      so that Method:
  #      - does not need to search it again
  #      - a string can be passed as first parameter to ActiveWrapper
Jean-Paul Smets's avatar
Jean-Paul Smets committed
448

449
  def __init__(self, passive_self, activity, active_process, kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
450 451
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
452
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
453 454 455 456
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
457
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
458 459
                  self.__dict__['__kw'], id)

460 461 462 463
  def __repr__(self):
    return '<%s at 0x%x to %r>' % (self.__class__.__name__, id(self),
                                   self.__dict__['__passive_self'])

464 465 466
# True when activities cannot be executing any more.
has_processed_shutdown = False

467 468 469 470 471 472 473 474
def cancelProcessShutdown():
  """
    This method reverts the effect of calling "process_shutdown" on activity
    tool.
  """
  global has_processed_shutdown
  is_running_lock.release()
  has_processed_shutdown = False
475

476
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
477
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
478 479 480 481 482 483 484 485 486 487 488 489
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
490 491 492
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
493
    portal_type = 'Activity Tool'
494
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
495 496
    security = ClassSecurityInfo()

497 498
    isIndexable = False

499 500
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
501
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
502
                     , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
503
                     , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
504
                     ,
505
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
506 507 508 509

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

510 511 512
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
    manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )

513 514
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
515 516 517 518 519 520
    
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
    manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
    
    distributingNode = ''
    _nodes = ()
521 522 523
    activity_creation_trace = False
    activity_tracking = False
    activity_timing_log = False
524
    cancel_and_invoke_links_hidden = False
525

526 527 528 529 530
    def SQLDict_setPriority(self, **kw):
      real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
      LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
      return real_SQLDict_setPriority(**kw)

531 532 533 534
    def __init__(self, id=None):
        if id is None:
          id = ActivityTool.id
        return Folder.__init__(self, id)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
535

536 537 538
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
539
        all = Folder.filtered_meta_types(self)
540 541 542 543 544 545
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

546 547 548 549 550 551 552 553 554 555 556 557 558 559
    def maybeMigrateConnectionClass(self):
      connection_id = 'cmf_activity_sql_connection'
      sql_connection = getattr(self, connection_id, None)
      if (sql_connection is not None and
          not isinstance(sql_connection, ActivityConnection)):
        # SQL Connection migration is needed
        LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
        parent = aq_parent(aq_inner(sql_connection))
        parent._delObject(sql_connection.getId())
        new_sql_connection = ActivityConnection(connection_id,
                                                sql_connection.title,
                                                sql_connection.connection_string)
        parent._setObject(connection_id, new_sql_connection)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
560 561
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
562
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
563
      # Initialize each queue
564
      for activity in activity_dict.itervalues():
Jean-Paul Smets's avatar
Jean-Paul Smets committed
565
        activity.initialize(self)
566
      self.maybeMigrateConnectionClass()
Vincent Pelletier's avatar
Vincent Pelletier committed
567
      is_initialized = True
568

569 570
    security.declareProtected(Permissions.manage_properties, 'isSubscribed')
    def isSubscribed(self):
Aurel's avatar
Aurel committed
571
        """
572 573 574 575 576 577 578 579 580 581 582 583
        return True, if we are subscribed to TimerService.
        Otherwise return False.
        """
        service = getTimerService(self)
        if not service:
            LOG('ActivityTool', INFO, 'TimerService not available')
            return False
        
        path = '/'.join(self.getPhysicalPath())
        if path in service.lisSubscriptions():
            return True
        return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
584

585
    security.declareProtected(Permissions.manage_properties, 'subscribe')
586
    def subscribe(self, REQUEST=None, RESPONSE=None):
587 588
        """ subscribe to the global Timer Service """
        service = getTimerService(self)
589
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
590
        if not service:
591
            LOG('ActivityTool', INFO, 'TimerService not available')
592 593 594 595
            url += urllib.quote('TimerService not available')
        else:
            service.subscribe(self)
            url += urllib.quote("Subscribed to Timer Service")
596 597
        if RESPONSE is not None:
            RESPONSE.redirect(url)
598 599

    security.declareProtected(Permissions.manage_properties, 'unsubscribe')
600
    def unsubscribe(self, REQUEST=None, RESPONSE=None):
601 602
        """ unsubscribe from the global Timer Service """
        service = getTimerService(self)
603
        url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
604
        if not service:
605
            LOG('ActivityTool', INFO, 'TimerService not available')
606 607 608 609
            url += urllib.quote('TimerService not available')
        else:
            service.unsubscribe(self)
            url += urllib.quote("Unsubscribed from Timer Service")
610 611
        if RESPONSE is not None:
            RESPONSE.redirect(url)
612

613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
    security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
    def isActivityTrackingEnabled(self):
      return self.activity_tracking

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
    def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity tracing.
        """
        self.activity_tracking = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
    def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity tracing.
        """
        self.activity_tracking = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Tracking log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
    def isActivityTimingLoggingEnabled(self):
      return self.activity_timing_log

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
    def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity timing logging.
        """
        self.activity_timing_log = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
    def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity timing logging.
        """
        self.activity_timing_log = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Timing log disabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
    def isActivityCreationTraceEnabled(self):
      return self.activity_creation_trace

    security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
    def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Enable activity creation trace.
        """
        self.activity_creation_trace = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace enabled')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
    def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
        """
          Disable activity creation trace.
        """
        self.activity_creation_trace = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Activity creation trace disabled')
          RESPONSE.redirect(url)

691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
    security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
    def isCancelAndInvokeLinksHidden(self):
      return self.cancel_and_invoke_links_hidden

    security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
    def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = True
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links hidden')
          RESPONSE.redirect(url)

    security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
    def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
        """
        """
        self.cancel_and_invoke_links_hidden = False
        if RESPONSE is not None:
          url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
          url += urllib.quote('Cancel and invoke links visible')
          RESPONSE.redirect(url)

715 716
    def manage_beforeDelete(self, item, container):
        self.unsubscribe()
717 718
        Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
    
719 720
    def manage_afterAdd(self, item, container):
        self.subscribe()
721 722
        Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
       
723 724
    def getCurrentNode(self):
        """ Return current node in form ip:port """
725 726
        global currentNode
        if currentNode is None:
727
          ip = port = ''
728 729
          from asyncore import socket_map
          for k, v in socket_map.items():
730
              if hasattr(v, 'addr'):
731 732 733
                  # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
                  type = str(getattr(v, '__class__', 'unknown'))
                  if type == 'ZServer.HTTPServer.zhttp_server':
734
                      ip, port = v.addr
735
                      break
736 737
          if ip == '0.0.0.0':
            ip = socket.gethostbyname(socket.gethostname())
738
          currentNode = '%s:%s' %(ip, port)
739 740 741 742 743 744 745
        return currentNode
        
    security.declarePublic('getDistributingNode')
    def getDistributingNode(self):
        """ Return the distributingNode """
        return self.distributingNode

746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
    def getNodeList(self, role=None):
      node_dict = self.getNodeDict()
      if role is None:
        result = [x for x in node_dict.keys()]
      else:
        result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
      result.sort()
      return result

    def getNodeDict(self):
      nodes = self._nodes
      if isinstance(nodes, tuple):
        new_nodes = OIBTree()
        new_nodes.update([(x, ROLE_PROCESSING) for x in self._nodes])
        self._nodes = nodes = new_nodes
      return nodes

    def registerNode(self, node):
      node_dict = self.getNodeDict()
      if not node_dict.has_key(node):
        if len(node_dict) == 0: # If we are registering the first node, make
                                # it both the distributing node and a processing
                                # node.
          role = ROLE_PROCESSING
          self.distributingNode = node
        else:
          role = ROLE_IDLE
        self.updateNode(node, role)

    def updateNode(self, node, role):
      node_dict = self.getNodeDict()
      node_dict[node] = role

    security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
    def getProcessingNodeList(self):
      return self.getNodeList(role=ROLE_PROCESSING)

783
    security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
784 785
    def getIdleNodeList(self):
      return self.getNodeList(role=ROLE_IDLE)
786

787 788 789 790
    def _isValidNodeName(self, node_name) :
      """Check we have been provided a good node name"""
      return isinstance(node_name, str) and NODE_RE.match(node_name)
      
791 792
    security.declarePublic('manage_setDistributingNode')
    def manage_setDistributingNode(self, distributingNode, REQUEST=None):
793
        """ set the distributing node """   
794
        if not distributingNode or self._isValidNodeName(distributingNode):
795 796 797 798 799 800 801 802 803 804 805 806 807
          self.distributingNode = distributingNode
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Distributing Node successfully changed."))
        else :
          if REQUEST is not None:
              REQUEST.RESPONSE.redirect(
                  REQUEST.URL1 +
                  '/manageLoadBalancing?manage_tabs_message=' +
                  urllib.quote("Malformed Distributing Node."))

808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
    def manage_delNode(self, unused_node_list=None, REQUEST=None):
      """ delete selected unused nodes """
      processing_node = self.getDistributingNode()
      updated_processing_node = False
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          if node in node_dict:
            del node_dict[node]
          if node == processing_node:
            self.processing_node = ''
            updated_processing_node = True
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing deleted."
        else:
          message = "Deleted nodes %r." % (unused_node_list, )
        if updated_processing_node:
          message += "Disabled distributing node because it was deleted."
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
    def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
      """ Change one or more idle nodes into processing nodes """
      if unused_node_list is not None:
        node_dict = self.getNodeDict()
        for node in unused_node_list:
          self.updateNode(node, ROLE_PROCESSING)
      if REQUEST is not None:
        if unused_node_list is None:
          message = "No unused node selected, nothing done."
        else:
          message = "Nodes now procesing: %r." % (unused_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))

    security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
    def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
      """ Change one or more procesing nodes into idle nodes """
      if processing_node_list is not None:
        node_dict = self.getNodeDict()
        for node in processing_node_list:
          self.updateNode(node, ROLE_IDLE)
      if REQUEST is not None:
        if processing_node_list is None:
          message = "No used node selected, nothing done."
        else:
          message = "Nodes now unused %r." % (processing_node_list, )
        REQUEST.RESPONSE.redirect(
          REQUEST.URL1 +
          '/manageLoadBalancing?manage_tabs_message=' +
          urllib.quote(message))
866

867 868 869 870 871
    def process_shutdown(self, phase, time_in_phase):
        """
          Prevent shutdown from happening while an activity queue is
          processing a batch.
        """
872
        global has_processed_shutdown
873 874
        if phase == 3 and not has_processed_shutdown:
          has_processed_shutdown = True
875 876 877 878
          LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
          is_running_lock.acquire()
          LOG('CMFActivity', INFO, "Shutdown: Activities finished.")

879
    def process_timer(self, tick, interval, prev="", next=""):
880 881 882 883 884 885 886 887
      """
      Call distribute() if we are the Distributing Node and call tic()
      with our node number.
      This method is called by TimerService in the interval given
      in zope.conf. The Default is every 5 seconds.
      """
      # Prevent TimerService from starting multiple threads in parallel
      if timerservice_lock.acquire(0):
888
        try:
889 890 891 892 893 894
          # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
          # but on 2.2 it's by traversal, and our site probably wasn't traversed
          # by the timerserver request, which goes into the Zope Control_Panel
          # calling it a second time is a harmless and cheap no-op.
          # both setupCurrentSkin and REQUEST are acquired from containers.
          self.setupCurrentSkin(self.REQUEST)
895
          old_sm = getSecurityManager()
896
          try:
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
            # get owner of portal_catalog, so normally we should be able to
            # have the permission to invoke all activities
            user = self.portal_catalog.getWrappedOwner()
            newSecurityManager(self.REQUEST, user)

            currentNode = self.getCurrentNode()
            self.registerNode(currentNode)
            processing_node_list = self.getNodeList(role=ROLE_PROCESSING)

            # only distribute when we are the distributingNode
            if self.getDistributingNode() == currentNode:
              self.distribute(len(processing_node_list))

            # SkinsTool uses a REQUEST cache to store skin objects, as
            # with TimerService we have the same REQUEST over multiple
            # portals, we clear this cache to make sure the cache doesn't
            # contains skins from another portal.
914
            try:
915 916 917 918 919 920 921 922 923 924 925 926 927
              self.getPortalObject().portal_skins.changeSkin(None)
            except AttributeError:
              pass

            # call tic for the current processing_node
            # the processing_node numbers are the indices of the elements
            # in the node tuple +1 because processing_node starts form 1
            if currentNode in processing_node_list:
              self.tic(processing_node_list.index(currentNode) + 1)
          except:
            # Catch ALL exception to avoid killing timerserver.
            LOG('ActivityTool', ERROR, 'process_timer received an exception',
                error=sys.exc_info())
928 929
          finally:
            setSecurityManager(old_sm)
Jérome Perrin's avatar
Jérome Perrin committed
930
        finally:
931
          timerservice_lock.release()
932

Jean-Paul Smets's avatar
Jean-Paul Smets committed
933 934 935 936 937 938
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
939 940
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
941 942

      # Call distribute on each queue
943
      for activity in activity_dict.itervalues():
944
        activity.distribute(aq_inner(self), node_count)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
945

Jean-Paul Smets's avatar
Jean-Paul Smets committed
946
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
947
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
948 949
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
950
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
951
      """
952
      global active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
953 954

      # return if the number of threads is too high
955
      # else, increase the number of active_threads and continue
956 957
      tic_lock.acquire()
      too_many_threads = (active_threads >= max_active_threads)
958
      if not too_many_threads or force:
959
        active_threads += 1
960 961 962
      else:
        tic_lock.release()
        raise RuntimeError, 'Too many threads'
963
      tic_lock.release()
964

Jean-Paul Smets's avatar
Jean-Paul Smets committed
965
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
966 967
      if not is_initialized:
        self.initialize()
968

969
      inner_self = aq_inner(self)
970

971
      try:
972
        #Sort activity list by priority
973 974 975
        activity_list = sorted(activity_dict.itervalues(),
                               key=lambda activity: activity.getPriority(self))

976
        # Wakeup each queue
977
        for activity in activity_list:
978
          activity.wakeup(inner_self, processing_node)
979

980 981 982 983
        # Process messages on each queue in round robin
        has_awake_activity = 1
        while has_awake_activity:
          has_awake_activity = 0
984
          for activity in activity_list:
985
            if is_running_lock.acquire(0):
986
              try:
987 988
                activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
                has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
989 990
              finally:
                is_running_lock.release()
991 992 993 994 995
      finally:
        # decrease the number of active_threads
        tic_lock.acquire()
        active_threads -= 1
        tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
996

997
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
998
      # Check in each queue if the object has deferred tasks
999 1000
      # if not argument is provided, then check on self
      if len(args) > 0:
1001
        obj = args[0]
1002
      else:
1003
        obj = self
1004
      for activity in activity_dict.itervalues():
1005
        if activity.hasActivity(aq_inner(self), obj, **kw):
1006 1007
          return True
      return False
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1008

1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
    def getActivityBuffer(self, create_if_not_found=True):
      """
        Get activtity buffer for this thread for this activity tool.
        If no activity buffer is found at lowest level and create_if_not_found
        is True, create one.
        Intermediate level is unconditionaly created if non existant because
        chances are it will be used in the instance life.
        Lock is held when checking for intermediate level existance
        because:
         - intermediate level dict must not be created in 2 threads at the
           same time, since one creation would destroy the existing one.
        It's released after that step because:
         - lower level access is at thread scope, thus by definition there
           can be only one access at a time to a key
         - GIL protects us when accessing python instances
      """
1025 1026
      # Safeguard: make sure we are wrapped in  acquisition context before
      # using our path as an activity tool instance-wide identifier.
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
      assert getattr(self, 'aq_self', None) is not None
      my_instance_key = self.getPhysicalPath()
      my_thread_key = get_ident()
      global_activity_buffer_lock.acquire()
      try:
        if my_instance_key not in global_activity_buffer:
          global_activity_buffer[my_instance_key] = {}
      finally:
        global_activity_buffer_lock.release()
      thread_activity_buffer = global_activity_buffer[my_instance_key]
1037 1038 1039
      try:
        return thread_activity_buffer[my_thread_key]
      except KeyError:
1040
        if create_if_not_found:
1041
          buffer = ActivityBuffer()
1042 1043 1044
        else:
          buffer = None
        thread_activity_buffer[my_thread_key] = buffer
1045
        return buffer
1046

1047 1048
    security.declarePrivate('activateObject')
    def activateObject(self, object, activity, active_process, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1049 1050
      if not is_initialized:
        self.initialize()
1051
      self.getActivityBuffer()
1052 1053 1054
      if isinstance(active_process, str):
        active_process = self.unrestrictedTraverse(active_process)
      return ActiveWrapper(object, activity, active_process, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1055

1056
    def deferredQueueMessage(self, activity, message):
1057 1058
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredQueueMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1059

1060
    def deferredDeleteMessage(self, activity, message):
1061 1062
      activity_buffer = self.getActivityBuffer()
      activity_buffer.deferredDeleteMessage(self, activity, message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1063

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1064
    def getRegisteredMessageList(self, activity):
1065
      activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1066
      if activity_buffer is not None:
1067 1068
        #activity_buffer._register() # This is required if flush flush is called outside activate
        return activity.getRegisteredMessageList(activity_buffer,
1069
                                                 aq_inner(self))
1070 1071
      else:
        return []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1072

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1073
    def unregisterMessage(self, activity, message):
1074 1075 1076
      activity_buffer = self.getActivityBuffer()
      #activity_buffer._register()
      return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1077

1078
    def flush(self, obj, invoke=0, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1079 1080
      if not is_initialized:
        self.initialize()
1081
      self.getActivityBuffer()
1082 1083
      if isinstance(obj, tuple):
        object_path = obj
1084
      else:
1085
        object_path = obj.getPhysicalPath()
1086
      for activity in activity_dict.itervalues():
1087
        activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1088

1089
    def start(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1090 1091
      if not is_initialized:
        self.initialize()
1092
      for activity in activity_dict.itervalues():
1093
        activity.start(aq_inner(self), **kw)
1094 1095

    def stop(self, **kw):
Vincent Pelletier's avatar
Vincent Pelletier committed
1096 1097
      if not is_initialized:
        self.initialize()
1098
      for activity in activity_dict.itervalues():
1099
        activity.stop(aq_inner(self), **kw)
1100

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1101
    def invoke(self, message):
1102
      if self.activity_tracking:
1103
        activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1104
      old_localizer_context = False
1105 1106
      if getattr(self, 'aq_chain', None) is not None:
        # Grab existing acquisition chain and extrach base objects.
1107
        base_chain = [aq_base(x) for x in self.aq_chain]
1108 1109 1110
        # Grab existig request (last chain item) and create a copy.
        request_container = base_chain.pop()
        request = request_container.REQUEST
1111 1112 1113 1114 1115 1116 1117 1118
        # Generate PARENTS value. Sadly, we cannot reuse base_chain since
        # PARENTS items must be wrapped in acquisition
        parents = []
        application = self.getPhysicalRoot().aq_base
        for parent in self.aq_chain:
          if parent.aq_base is application:
            break
          parents.append(parent)
1119 1120
        # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
        # runing unit tests. Recreate it if it does not exist.
1121 1122
        if getattr(request.other, 'PARENTS', None) is None:
          request.other['PARENTS'] = parents
1123
        # XXX: itools (used by Localizer) requires PATH_INFO to be set, and it's
1124 1125
        # not when runing unit tests. Recreate it if it does not exist.
        if request.environ.get('PATH_INFO') is None:
1126
          request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1127 1128 1129
        
        # restore request information
        new_request = request.clone()
1130
        request_info = message.request_info
1131 1132
        # PARENTS is truncated by clone
        new_request.other['PARENTS'] = parents
1133 1134
        if '_script' in request_info:
          new_request._script = request_info['_script']
1135
        if 'SERVER_URL' in request_info:
1136
          new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1137 1138 1139
        if 'VirtualRootPhysicalPath' in request_info:
          new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
        if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1140
          new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1141 1142
          # Replace Localizer/iHotfix Context, saving existing one
          localizer_context = LocalizerContext(new_request)
1143
          id = get_ident()
1144
          localizer_lock.acquire()
1145
          try:
1146 1147
            old_localizer_context = localizer_contexts.get(id)
            localizer_contexts[id] = localizer_context
1148
          finally:
1149 1150
            localizer_lock.release()
          # Execute Localizer/iHotfix "patch 2"
1151
          new_request.processInputs()
1152 1153

        new_request_container = request_container.__class__(REQUEST=new_request)
1154 1155 1156 1157 1158 1159 1160 1161
        # Recreate acquisition chain.
        my_self = new_request_container
        base_chain.reverse()
        for item in base_chain:
          my_self = item.__of__(my_self)
      else:
        my_self = self
        LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1162 1163 1164
      try:
        message(my_self)
      finally:
1165 1166 1167 1168
        if my_self is not self: # We rewrapped self
          # Restore default skin selection
          skinnable = self.getPortalObject()
          skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1169 1170
        if old_localizer_context is not False:
          # Restore Localizer/iHotfix context
1171
          id = get_ident()
1172
          localizer_lock.acquire()
1173
          try:
1174 1175
            if old_localizer_context is None:
              del localizer_contexts[id]
1176
            else:
1177
              localizer_contexts[id] = old_localizer_context
1178
          finally:
1179
            localizer_lock.release()
1180
      if self.activity_tracking:
1181
        activity_tracking_logger.info('invoked message')
1182 1183 1184
      if my_self is not self: # We rewrapped self
        for held in my_self.REQUEST._held:
          self.REQUEST._hold(held)
1185

1186
    def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1187
      if self.activity_tracking:
1188 1189 1190
        activity_tracking_logger.info(
          'invoking group messages: method_id=%s, paths=%s'
          % (method_id, ['/'.join(m.object_path) for m in message_list]))
1191
      # Invoke a group method.
1192
      message_dict = {}
1193
      path_set = set()
1194 1195 1196
      # Filter the list of messages. If an object is not available, mark its
      # message as non-executable. In addition, expand an object if necessary,
      # and make sure that no duplication happens.
1197
      for m in message_list:
1198 1199
        # alternate method is used to segregate objects which cannot be grouped.
        alternate_method_id = m.activity_kw.get('alternate_method_id')
1200 1201
        try:
          obj = m.getObject(self)
1202
        except KeyError:
1203 1204 1205 1206
          LOG('CMFActivity', ERROR,
              'Message failed in getting an object from the path %r' % \
                  (m.object_path,),
              error=sys.exc_info())
1207
          m.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=self)
1208 1209
          continue
        try:
1210
          if m.hasExpandMethod():
1211
            subobject_list = m.getObjectList(self)
1212
          else:
1213
            subobject_list = (obj,)
1214
          message_dict[m] = expanded_object_list = []
1215
          for subobj in subobject_list:
1216 1217 1218 1219
            if merge_duplicate:
              path = subobj.getPath()
              if path in path_set:
                continue
1220
              path_set.add(path)
1221 1222 1223 1224 1225 1226 1227 1228 1229 1230
            if alternate_method_id is not None \
               and hasattr(aq_base(subobj), alternate_method_id):
              # if this object is alternated,
              # generate a new single active object
              activity_kw = m.activity_kw.copy()
              activity_kw.pop('group_method_id', None)
              activity_kw.pop('group_id', None)
              active_obj = subobj.activate(activity=activity, **activity_kw)
              getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
            else:
1231
              expanded_object_list.append([subobj, m.args, m.kw, None])
1232
        except:
1233
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1234

1235
      expanded_object_list = sum(message_dict.itervalues(), [])
1236 1237
      try:
        if len(expanded_object_list) > 0:
1238
          traverse = self.getPortalObject().unrestrictedTraverse
1239
          # FIXME: how to apply security here?
1240 1241 1242 1243
          # NOTE: expanded_object_list[*][3] must be updated by the callee:
          #       it must be deleted in case of failure, or updated with the
          #       result to post on the active process otherwise.
          traverse(method_id)(expanded_object_list)
1244 1245
      except:
        # In this case, the group method completely failed.
1246
        exc_info = sys.exc_info()
1247
        for m in message_dict:
1248
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1249
        LOG('WARNING ActivityTool', 0,
1250
            'Could not call method %s on objects %s' %
1251
            (method_id, [x[0] for x in expanded_object_list]), error=exc_info)
1252 1253 1254
        error_log = getattr(self, 'error_log', None)
        if error_log is not None:
          error_log.raising(exc_info)
1255
      else:
1256 1257 1258 1259 1260 1261 1262 1263
        # Note there can be partial failures.
        for m, expanded_object_list in message_dict.iteritems():
          result_list = []
          for result in expanded_object_list:
            if len(result) != 4:
              break # message marked as failed by the group_method_id
            elif result[3] is not None:
              result_list.append(result)
1264 1265
          else:
            try:
1266 1267 1268 1269
              if result_list and m.active_process:
                active_process = traverse(m.active_process)
                for result in result_list:
                  m.activateResult(active_process, result[3], result[0])
1270
            except:
1271
              pass
1272
            else:
1273
              m.setExecutionState(MESSAGE_EXECUTED, context=self)
1274 1275
              continue
          m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1276
      if self.activity_tracking:
1277
        activity_tracking_logger.info('invoked group messages')
1278

1279 1280 1281 1282 1283
    security.declarePrivate('dummyGroupMethod')
    class dummyGroupMethod(object):
      def __bobo_traverse__(self, REQUEST, method_id):
        def group_method(message_list):
          for m in message_list:
1284
            m[3] = getattr(m[0], method_id)(*m[1], **m[2])
1285 1286 1287
        return group_method
    dummyGroupMethod = dummyGroupMethod()

1288 1289
    def newMessage(self, activity, path, active_process,
                   activity_kw, method_id, *args, **kw):
1290
      # Some Security Cheking should be made here XXX
Vincent Pelletier's avatar
Vincent Pelletier committed
1291 1292
      if not is_initialized:
        self.initialize()
1293
      self.getActivityBuffer()
1294
      activity_dict[activity].queueMessage(aq_inner(self),
1295
        Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1296

1297
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1298 1299 1300 1301 1302 1303
    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1304
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1305
      if REQUEST is not None:
1306 1307
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1308

1309
    security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1310 1311 1312 1313 1314 1315
    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
1316
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1317
      if REQUEST is not None:
1318 1319
        return REQUEST.RESPONSE.redirect('%s/%s' %
                (self.absolute_url(), 'manageActivities'))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1320

1321 1322
    security.declareProtected( CMFCorePermissions.ManagePortal,
                               'manageClearActivities' )
1323
    def manageClearActivities(self, keep=1, REQUEST=None):
1324 1325 1326
      """
        Clear all activities and recreate tables.
      """
1327
      folder = self.getPortalObject().portal_skins.activity
1328

1329
      # Obtain all pending messages.
1330
      message_list_dict = {}
1331
      if keep:
1332
        for activity in activity_dict.itervalues():
1333 1334
          if hasattr(activity, 'dumpMessageList'):
            try:
1335 1336
              message_list_dict[activity.__class__.__name__] =\
                                    activity.dumpMessageList(self)
1337 1338 1339
            except ConflictError:
              raise
            except:
1340 1341 1342
              LOG('ActivityTool', WARNING,
                  'could not dump messages from %s' %
                  (activity,), error=sys.exc_info())
1343 1344

      if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
1345 1346 1347 1348 1349
        try:
          folder.SQLDict_dropMessageTable()
        except ConflictError:
          raise
        except:
1350
          LOG('CMFActivity', WARNING,
1351
              'could not drop the message table',
1352 1353 1354
              error=sys.exc_info())
        folder.SQLDict_createMessageTable()

1355
      if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
1356 1357 1358 1359 1360
        try:
          folder.SQLQueue_dropMessageTable()
        except ConflictError:
          raise
        except:
1361
          LOG('CMFActivity', WARNING,
1362
              'could not drop the message queue table',
1363 1364 1365
              error=sys.exc_info())
        folder.SQLQueue_createMessageTable()

1366
      # Reactivate the messages.
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376
      for activity, message_list in message_list_dict.iteritems():
        for m in message_list:
          try:
            m.reactivate(aq_inner(self), activity=activity)
          except ConflictError:
            raise
          except:
            LOG('ActivityTool', WARNING,
                'could not reactivate the message %r, %r' %
                (m.object_path, m.method_id), error=sys.exc_info())
1377

1378
      if REQUEST is not None:
1379 1380 1381 1382 1383 1384
        message = 'Activities%20Cleared'
        if keep:
          message = 'Tables%20Recreated'
        return REQUEST.RESPONSE.redirect(
            '%s/manageActivitiesAdvanced?manage_tabs_message=%s' % (
              self.absolute_url(), message))
1385

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1386
    security.declarePublic('getMessageList')
1387
    def getMessageList(self,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1388 1389 1390
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1391
      # Initialize if needed
Vincent Pelletier's avatar
Vincent Pelletier committed
1392 1393
      if not is_initialized:
        self.initialize()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1394

Jean-Paul Smets's avatar
Jean-Paul Smets committed
1395
      message_list = []
1396
      for activity in activity_dict.itervalues():
Sebastien Robin's avatar
Sebastien Robin committed
1397
        try:
1398
          message_list += activity.getMessageList(aq_inner(self),**kw)
Sebastien Robin's avatar
Sebastien Robin committed
1399 1400
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1401 1402
      return message_list

1403 1404 1405 1406 1407 1408
    security.declarePublic('countMessageWithTag')
    def countMessageWithTag(self, value):
      """
        Return the number of messages which match the given tag.
      """
      message_count = 0
1409
      for activity in activity_dict.itervalues():
1410
        message_count += activity.countMessageWithTag(aq_inner(self), value)
Sebastien Robin's avatar
Sebastien Robin committed
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
      return message_count

    security.declarePublic('countMessage')
    def countMessage(self, **kw):
      """
        Return the number of messages which match the given parameter.

        Parameters allowed:

        method_id : the id of the method
Jérome Perrin's avatar
Jérome Perrin committed
1421
        path : for activities on a particular object
Sebastien Robin's avatar
Sebastien Robin committed
1422 1423 1424 1425
        tag : activities with a particular tag
        message_uid : activities with a particular uid
      """
      message_count = 0
1426
      for activity in activity_dict.itervalues():
1427
        message_count += activity.countMessage(aq_inner(self), **kw)
1428 1429
      return message_count

1430
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1431 1432 1433 1434 1435 1436 1437
    def newActiveProcess(self, REQUEST=None, **kw):
      # note: if one wants to create an Actice Process without ERP5 products,
      # she can call ActiveProcess.addActiveProcess
      obj = self.newContent(portal_type="Active Process", **kw)
      if REQUEST is not None:
        REQUEST['RESPONSE'].redirect( 'manage_main' )
      return obj
1438

1439
    # Active synchronisation methods
1440
    security.declarePrivate('validateOrder')
1441
    def validateOrder(self, message, validator_id, validation_value):
1442 1443 1444 1445 1446
      message_list = self.getDependentMessageList(message, validator_id, validation_value)
      return len(message_list) > 0

    security.declarePrivate('getDependentMessageList')
    def getDependentMessageList(self, message, validator_id, validation_value):
Vincent Pelletier's avatar
Vincent Pelletier committed
1447 1448
      if not is_initialized:
        self.initialize()
1449
      message_list = []
Vincent Pelletier's avatar
Vincent Pelletier committed
1450
      method_id = "_validate_%s" % validator_id
1451
      for activity in activity_dict.itervalues():
1452 1453 1454 1455 1456 1457
        method = getattr(activity, method_id, None)
        if method is not None:
          result = method(aq_inner(self), message, validation_value)
          if result:
            message_list.extend([(activity, m) for m in result])
      return message_list
1458

Yoshinori Okuji's avatar
Yoshinori Okuji committed
1459 1460
    # Required for tests (time shift)
    def timeShift(self, delay):
Vincent Pelletier's avatar
Vincent Pelletier committed
1461 1462
      if not is_initialized:
        self.initialize()
1463
      for activity in activity_dict.itervalues():
1464
        activity.timeShift(aq_inner(self), delay)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1465

1466
InitializeClass(ActivityTool)