ActivityTool.py 15.3 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

from Products.CMFCore import CMFCorePermissions
30
from Products.ERP5Type.Document.Folder import Folder
31
from Products.PythonScripts.Utility import allow_class
Jean-Paul Smets's avatar
Jean-Paul Smets committed
32 33
from AccessControl import ClassSecurityInfo
from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthenticatedUser
34
from Globals import InitializeClass, DTMLFile, get_request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35 36
from Acquisition import aq_base
from DateTime.DateTime import DateTime
37
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
38
from ActivityBuffer import ActivityBuffer
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39 40 41 42 43 44 45 46 47
import threading

from zLOG import LOG

# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48
tic_lock = threading.Lock() # A RAM based lock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49 50 51 52 53 54 55 56 57 58 59 60 61

# Activity Registration
activity_dict = {}
activity_list = []

def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
  LOG('Init Activity', 0, str(activity.__name__))
  activity_instance = activity()
  activity_list.append(activity_instance)
  activity_dict[activity.__name__] = activity_instance

Jean-Paul Smets's avatar
Jean-Paul Smets committed
62 63
class Result:

64
  def __init__(self, object_or_path, method_id, result, log_title=None, log_id=None, log_message=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65 66 67 68 69 70 71 72 73 74
    # Some utility function to do this would be useful since we use it everywhere XXX
    if type(object_or_path) in (type([]), type(())):
      url = '/'.join(object_or_path)
      path = object_or_path
    elif type(object_or_path) is type('a'):
      path = object_or_path.split('/')
      url = object_or_path
    else:
      path = object_or_path.getPhysicalPath()
      url = '/'.join(path)
75 76 77 78 79 80 81
    self.object_path = path
    self.object_url = url
    self.method_id = method_id
    self.result = result              # Include arbitrary result
    self.log_title = log_title        # Should follow Zope convention for LOG title
    self.log_id = log_id              # Should follow Zope convention for LOG ids
    self.log_message = log_message    # Should follow Zope convention for LOG message
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82

Jean-Paul Smets's avatar
Jean-Paul Smets committed
83 84
allow_class(Result)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
85
class Message:
86
  
87
  def __init__(self, object, active_process, activity_kw, method_id, args, kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
88 89 90 91
    if type(object) is type('a'):
      self.object_path = object.split('/')
    else:
      self.object_path = object.getPhysicalPath()
92 93 94 95 96 97
    if type(active_process) is type('a'):
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
98
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100 101 102
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
103
    self.is_executed = 0
104 105
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
106 107

  def __call__(self, activity_tool):
Sebastien Robin's avatar
Sebastien Robin committed
108 109
    #try:
    if 1:
110 111
      LOG('WARNING ActivityTool', 0,
           'Trying to call method %s on object %s' % (self.method_id, self.object_path))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
112
      object = activity_tool.unrestrictedTraverse(self.object_path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
113 114
      # Change user if required (TO BE DONE)
      # self.activity_kw
115 116 117 118 119
      REQUEST = get_request()
      REQUEST.active_process = self.active_process
      result = getattr(object, self.method_id)(*self.args, **self.kw)
      if REQUEST.active_process is not None:
        active_process = activity_tool.getActiveProcess()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
120
        active_process.activateResult(Result(object,self.method_id,result)) # XXX Allow other method_id in future
Jean-Paul Smets's avatar
Jean-Paul Smets committed
121
      self.is_executed = 1
Sebastien Robin's avatar
Sebastien Robin committed
122 123
    else:
    #except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
124
      self.is_executed = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
125 126 127 128 129 130
      LOG('WARNING ActivityTool', 0,
           'Could not call method %s on object %s' % (self.method_id, self.object_path))

  def validate(self, activity, activity_tool):
    return activity.validate(activity_tool, self, **self.activity_kw)

131 132 133 134 135 136 137 138 139 140 141 142 143 144
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
    user_email = activity_tool.portal_memberdata.getProperty('email')
    mail_text = """From: %s
To: %s
Subject: %s

%s

Document: %s
Method: %s
    """ % (activity_tool.email_from_address, user_email,
           message, message, '/'.join(self.object_path), self.method_id)
    activity_tool.MailHost.send( mail_text )

Jean-Paul Smets's avatar
Jean-Paul Smets committed
145 146
class Method:

147
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
148 149
    self.__passive_self = passive_self
    self.__activity = activity
150
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
151 152 153 154
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
155
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
156 157
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

158 159
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
160 161
class ActiveWrapper:

162
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
163 164
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
165
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
166 167 168 169
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
170
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
171 172
                  self.__dict__['__kw'], id)

173
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
174
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
175 176 177 178 179 180 181 182 183 184 185 186
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
187 188 189
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
190
    portal_type = 'Activity Tool'
191
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
192 193
    security = ClassSecurityInfo()

194 195
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
196 197
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
                     ,
198
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
199 200 201 202

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

203 204 205 206 207
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
208
    
209 210 211 212 213 214 215 216 217 218
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
219 220
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
221
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
222 223 224 225 226
      # Initialize each queue
      for activity in activity_list:
        activity.initialize(self)
      is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
227 228 229 230 231 232 233 234 235 236
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
      if not is_initialized: self.initialize()

      # Call distribute on each queue
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
237 238
        #try:
        if 1:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
239
          activity.distribute(self, node_count)
Sebastien Robin's avatar
Sebastien Robin committed
240 241
        #except:
        else:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
242 243
          LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
244
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
245
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246 247
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
248
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
249 250 251 252
      """
      global active_threads, is_initialized

      # return if the number of threads is too high
Jean-Paul Smets's avatar
Jean-Paul Smets committed
253
      if active_threads >= max_active_threads:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
254 255
        if not force: return 'Too many threads'

Jean-Paul Smets's avatar
Jean-Paul Smets committed
256
      if tic_lock is None:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
257 258 259 260 261 262
        return

      # Initialize if needed
      if not is_initialized: self.initialize()

      # increase the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
263
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
264
      active_threads += 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
265
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
266 267 268

      # Wakeup each queue
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
269 270
        if 1:
        #try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
271
          activity.wakeup(self, processing_node)
Sebastien Robin's avatar
Sebastien Robin committed
272 273
        else:
        #except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
274 275 276 277 278 279 280
          LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)

      # Process messages on each queue in round robin
      has_awake_activity = 1
      while has_awake_activity:
        has_awake_activity = 0
        for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
281 282
          #try:
          if 1:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
283 284
            activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
            has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
Sebastien Robin's avatar
Sebastien Robin committed
285 286
          #except:
          else:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
287 288 289
            LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity)

      # decrease the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
290
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
291
      active_threads -= 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
292
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
293

294
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
295
      # Check in each queue if the object has deferred tasks
296 297 298 299 300
      # if not argument is provided, then check on self
      if len(args) > 0:
        object = args[0]
      else:
        object = self
Jean-Paul Smets's avatar
Jean-Paul Smets committed
301 302 303 304 305
      for activity in activity_list:
        if activity.hasActivity(self, object, **kw):
          return 1
      return 0

306
    def activate(self, object, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
307 308
      global is_initialized
      if not is_initialized: self.initialize()
309
      if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
310
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
311

312 313 314 315 316 317
    def deferredQueueMessage(self, activity, message):
      self._v_activity_buffer.deferredQueueMessage(self, activity, message)
      
    def deferredDeleteMessage(self, activity, message):
      self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
          
Jean-Paul Smets's avatar
Jean-Paul Smets committed
318
    def getRegisteredMessageList(self, activity):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
319 320 321 322
      return activity.getRegisteredMessageList(self._v_activity_buffer, self)
          
    def unregisterMessage(self, activity, message):
      return activity.unregisterMessage(self._v_activity_buffer, self, message)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
323
          
Jean-Paul Smets's avatar
Jean-Paul Smets committed
324 325 326 327 328 329 330 331
    def flush(self, object, invoke=0, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      object_path = object.getPhysicalPath()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'flushing activity %s' % activity.__class__.__name__)
        activity.flush(self, object_path, invoke=invoke, **kw)

332 333 334 335 336 337 338 339 340 341 342 343 344 345
    def start(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
        activity.start(self, **kw)

    def stop(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
        activity.stop(self, **kw)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
346 347 348
    def invoke(self, message):
      message(self)

349 350
    def newMessage(self, activity, path, active_process, activity_kw, method_id, *args, **kw):
      # Some Security Cheking should be made here XXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
351 352
      global is_initialized
      if not is_initialized: self.initialize()
353
      activity_dict[activity].queueMessage(self, Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
354 355 356 357 358 359 360 361

    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
362 363 364 365
        try:
          activity.flush(self, object_path, method_id=method_id, invoke=1)
        except AttributeError:
          LOG('CMFActivity.manageCancel, Warning, could not flush activity on:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
366 367 368 369 370 371 372 373 374 375
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
376 377 378 379
        try:
          activity.flush(self, object_path, method_id=method_id, invoke=0)
        except AttributeError:
          LOG('CMFActivity.manageCancel, Warning, could not flush activity on:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
380 381 382 383 384 385 386 387
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    security.declarePublic('getMessageList')
    def getMessageList(self):
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
388 389 390
      # Initialize if needed
      if not is_initialized: self.initialize()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
391 392
      message_list = []
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
393 394 395 396
        try:
          message_list += activity.getMessageList(self)
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
397 398
      return message_list

399
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
400
    def newActiveProcess(self, **kw):
401 402 403
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
404 405 406
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
407 408 409 410 411 412 413 414 415 416 417

    def reindexObject(self):
      self.immediateReindexObject()

    def getActiveProcess(self):
      REQUEST = get_request()
      if REQUEST.active_process:
        return self.unrestrictedTraverse(REQUEST.active_process)
      return None


418
InitializeClass(ActivityTool)