ActivityTool.py 15.7 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
Jean-Paul Smets's avatar
Jean-Paul Smets committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

from Products.CMFCore import CMFCorePermissions
30
from Products.ERP5Type.Document.Folder import Folder
31
from Products.PythonScripts.Utility import allow_class
Jean-Paul Smets's avatar
Jean-Paul Smets committed
32 33
from AccessControl import ClassSecurityInfo
from Products.CMFCore.utils import UniqueObject, _checkPermission, _getAuthenticatedUser
34
from Globals import InitializeClass, DTMLFile, get_request
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35 36
from Acquisition import aq_base
from DateTime.DateTime import DateTime
37
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
38
from ActivityBuffer import ActivityBuffer
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39 40 41 42 43 44 45 46 47
import threading

from zLOG import LOG

# Using a RAM property (not a property of an instance) allows
# to prevent from storing a state in the ZODB (and allows to restart...)
active_threads = 0
max_active_threads = 1 # 2 will cause more bug to appear (he he)
is_initialized = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48
tic_lock = threading.Lock() # A RAM based lock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49 50 51 52 53 54 55 56 57 58 59 60 61

# Activity Registration
activity_dict = {}
activity_list = []

def registerActivity(activity):
  # Must be rewritten to register
  # class and create instance for each activity
  LOG('Init Activity', 0, str(activity.__name__))
  activity_instance = activity()
  activity_list.append(activity_instance)
  activity_dict[activity.__name__] = activity_instance

Jean-Paul Smets's avatar
Jean-Paul Smets committed
62 63
class Result:

64
  def __init__(self, object_or_path, method_id, result, log_title=None, log_id=None, log_message=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65 66 67 68 69 70 71 72 73 74
    # Some utility function to do this would be useful since we use it everywhere XXX
    if type(object_or_path) in (type([]), type(())):
      url = '/'.join(object_or_path)
      path = object_or_path
    elif type(object_or_path) is type('a'):
      path = object_or_path.split('/')
      url = object_or_path
    else:
      path = object_or_path.getPhysicalPath()
      url = '/'.join(path)
75 76 77 78 79 80 81
    self.object_path = path
    self.object_url = url
    self.method_id = method_id
    self.result = result              # Include arbitrary result
    self.log_title = log_title        # Should follow Zope convention for LOG title
    self.log_id = log_id              # Should follow Zope convention for LOG ids
    self.log_message = log_message    # Should follow Zope convention for LOG message
Jean-Paul Smets's avatar
Jean-Paul Smets committed
82

Jean-Paul Smets's avatar
Jean-Paul Smets committed
83 84
allow_class(Result)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
85
class Message:
86
  
87
  def __init__(self, object, active_process, activity_kw, method_id, args, kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
88 89 90 91
    if type(object) is type('a'):
      self.object_path = object.split('/')
    else:
      self.object_path = object.getPhysicalPath()
92 93 94 95 96 97
    if type(active_process) is type('a'):
      self.active_process = active_process.split('/')
    elif active_process is None:
      self.active_process = None
    else:
      self.active_process = active_process.getPhysicalPath()
98
      self.active_process_uid = active_process.getUid()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100 101 102
    self.activity_kw = activity_kw
    self.method_id = method_id
    self.args = args
    self.kw = kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
103
    self.is_executed = 0
104 105
    self.user_name = str(_getAuthenticatedUser(self))
    # Store REQUEST Info ?
Jean-Paul Smets's avatar
Jean-Paul Smets committed
106 107

  def __call__(self, activity_tool):
108
    try:
109 110
      LOG('WARNING ActivityTool', 0,
           'Trying to call method %s on object %s' % (self.method_id, self.object_path))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
111
      object = activity_tool.unrestrictedTraverse(self.object_path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
112 113
      # Change user if required (TO BE DONE)
      # self.activity_kw
Jean-Paul Smets's avatar
Jean-Paul Smets committed
114
      activity_tool._v_active_process = self.active_process # Store the active_process as volatile thread variable
115
      result = getattr(object, self.method_id)(*self.args, **self.kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
116
      if activity_tool._v_active_process is not None:
117
        active_process = activity_tool.getActiveProcess()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
118
        active_process.activateResult(Result(object,self.method_id,result)) # XXX Allow other method_id in future
Jean-Paul Smets's avatar
Jean-Paul Smets committed
119
      self.is_executed = 1
120
    except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
121
      self.is_executed = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
122 123 124 125 126 127
      LOG('WARNING ActivityTool', 0,
           'Could not call method %s on object %s' % (self.method_id, self.object_path))

  def validate(self, activity, activity_tool):
    return activity.validate(activity_tool, self, **self.activity_kw)

128
  def notifyUser(self, activity_tool, message="Failed Processing Activity"):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
129 130 131 132 133 134
    #LOG('notifyUser begin', 0, str(self.user_name))
    user_email = activity_tool.portal_membership.getMemberById(self.user_name).getProperty('email')
    if user_email in ('', None):
      user_email = activity_tool.email_from_address
    #LOG('notifyUser user_email', 0, str(user_email))
    mail_text = """From: %s
135 136 137 138 139 140 141 142 143
To: %s
Subject: %s

%s

Document: %s
Method: %s
    """ % (activity_tool.email_from_address, user_email,
           message, message, '/'.join(self.object_path), self.method_id)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
144 145 146
    #LOG('notifyUser mail_text', 0, str(mail_text))
    activity_tool.MailHost.send( mail_text )
    #LOG('notifyUser send', 0, '')
147

Jean-Paul Smets's avatar
Jean-Paul Smets committed
148 149
class Method:

150
  def __init__(self, passive_self, activity, active_process, kw, method_id):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
151 152
    self.__passive_self = passive_self
    self.__activity = activity
153
    self.__active_process = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
154 155 156 157
    self.__kw = kw
    self.__method_id = method_id

  def __call__(self, *args, **kw):
158
    m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
159 160
    activity_dict[self.__activity].queueMessage(self.__passive_self.portal_activities, m)

161 162
allow_class(Method)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
163 164
class ActiveWrapper:

165
  def __init__(self, passive_self, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
166 167
    self.__dict__['__passive_self'] = passive_self
    self.__dict__['__activity'] = activity
168
    self.__dict__['__active_process'] = active_process
Jean-Paul Smets's avatar
Jean-Paul Smets committed
169 170 171 172
    self.__dict__['__kw'] = kw

  def __getattr__(self, id):
    return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
173
                  self.__dict__['__active_process'],
Jean-Paul Smets's avatar
Jean-Paul Smets committed
174 175
                  self.__dict__['__kw'], id)

176
class ActivityTool (Folder, UniqueObject):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
177
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
178 179 180 181 182 183 184 185 186 187 188 189
    ActivityTool is the central point for activity management.

    Improvement to consider to reduce locks:

      Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
              thus allowing all SQL transaction to happen in a very short time
              (this would also be a great way of using MyISAM tables)

      Idea 2: do the same at the level of ActivityTool

      Idea 3: do the same at the level of each activity (ie. queueMessage
              accumulates and fires messages at the end of the transactino)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
190 191 192
    """
    id = 'portal_activities'
    meta_type = 'CMF Activity Tool'
193
    portal_type = 'Activity Tool'
194
    allowed_types = ( 'CMF Active Process', )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
195 196
    security = ClassSecurityInfo()

197 198
    manage_options = tuple(
                     [ { 'label' : 'Overview', 'action' : 'manage_overview' }
Jean-Paul Smets's avatar
Jean-Paul Smets committed
199 200
                     , { 'label' : 'Activities', 'action' : 'manageActivities' }
                     ,
201
                     ] + list(Folder.manage_options))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
202 203 204 205

    security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
    manageActivities = DTMLFile( 'dtml/manageActivities', globals() )

206 207 208 209 210
    security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
    manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )

    def __init__(self):
        return Folder.__init__(self, ActivityTool.id)
211
    
212 213 214 215 216 217 218 219 220 221
    # Filter content (ZMI))
    def filtered_meta_types(self, user=None):
        # Filters the list of available meta types.
        all = ActivityTool.inheritedAttribute('filtered_meta_types')(self)
        meta_types = []
        for meta_type in self.all_meta_types():
            if meta_type['name'] in self.allowed_types:
                meta_types.append(meta_type)
        return meta_types

Jean-Paul Smets's avatar
Jean-Paul Smets committed
222 223
    def initialize(self):
      global is_initialized
Sebastien Robin's avatar
Sebastien Robin committed
224
      from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
Jean-Paul Smets's avatar
Jean-Paul Smets committed
225 226 227 228 229
      # Initialize each queue
      for activity in activity_list:
        activity.initialize(self)
      is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
230 231 232 233 234 235 236 237 238 239
    security.declarePublic('distribute')
    def distribute(self, node_count=1):
      """
        Distribute load
      """
      # Initialize if needed
      if not is_initialized: self.initialize()

      # Call distribute on each queue
      for activity in activity_list:
240
        try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
241
          activity.distribute(self, node_count)
242
        except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
243 244
          LOG('CMFActivity:', 100, 'Core call to distribute failed for activity %s' % activity)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
245
    security.declarePublic('tic')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246
    def tic(self, processing_node=1, force=0):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
247 248
      """
        Starts again an activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
249
        processing_node starts from 1 (there is not node 0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
250 251 252 253
      """
      global active_threads, is_initialized

      # return if the number of threads is too high
Jean-Paul Smets's avatar
Jean-Paul Smets committed
254
      if active_threads >= max_active_threads:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
255 256
        if not force: return 'Too many threads'

Jean-Paul Smets's avatar
Jean-Paul Smets committed
257
      if tic_lock is None:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
258 259 260 261 262 263
        return

      # Initialize if needed
      if not is_initialized: self.initialize()

      # increase the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
264
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
265
      active_threads += 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
266
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
267 268 269

      # Wakeup each queue
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
270 271
        if 1:
        #try:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
272
          activity.wakeup(self, processing_node)
Sebastien Robin's avatar
Sebastien Robin committed
273 274
        else:
        #except:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
275 276 277 278 279 280 281
          LOG('CMFActivity:', 100, 'Core call to wakeup failed for activity %s' % activity)

      # Process messages on each queue in round robin
      has_awake_activity = 1
      while has_awake_activity:
        has_awake_activity = 0
        for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
282 283
          #try:
          if 1:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
284 285
            activity.tic(self, processing_node) # Transaction processing is the responsability of the activity
            has_awake_activity = has_awake_activity or activity.isAwake(self, processing_node)
Sebastien Robin's avatar
Sebastien Robin committed
286 287
          #except:
          else:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
288 289 290
            LOG('CMFActivity:', 100, 'Core call to tic or isAwake failed for activity %s' % activity)

      # decrease the number of active_threads
Jean-Paul Smets's avatar
Jean-Paul Smets committed
291
      tic_lock.acquire()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
292
      active_threads -= 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
293
      tic_lock.release()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
294

295
    def hasActivity(self, *args, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
296
      # Check in each queue if the object has deferred tasks
297 298 299 300 301
      # if not argument is provided, then check on self
      if len(args) > 0:
        object = args[0]
      else:
        object = self
Jean-Paul Smets's avatar
Jean-Paul Smets committed
302 303 304 305 306
      for activity in activity_list:
        if activity.hasActivity(self, object, **kw):
          return 1
      return 0

307
    def activate(self, object, activity, active_process, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
308 309
      global is_initialized
      if not is_initialized: self.initialize()
310
      if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
311
      return ActiveWrapper(object, activity, active_process, **kw)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
312

313 314 315 316 317 318
    def deferredQueueMessage(self, activity, message):
      self._v_activity_buffer.deferredQueueMessage(self, activity, message)
      
    def deferredDeleteMessage(self, activity, message):
      self._v_activity_buffer.deferredDeleteMessage(self, activity, message)
          
Jean-Paul Smets's avatar
Jean-Paul Smets committed
319
    def getRegisteredMessageList(self, activity):
320 321 322 323
      if getattr(self, '_v_activity_buffer', None):
        return activity.getRegisteredMessageList(self._v_activity_buffer, self)
      else:
        return []
Jean-Paul Smets's avatar
Jean-Paul Smets committed
324 325
          
    def unregisterMessage(self, activity, message):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
326
      self._v_activity_buffer._register() # Required if called by flush, outside activate
Jean-Paul Smets's avatar
Jean-Paul Smets committed
327
      return activity.unregisterMessage(self._v_activity_buffer, self, message)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
328
          
Jean-Paul Smets's avatar
Jean-Paul Smets committed
329 330 331
    def flush(self, object, invoke=0, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
332 333 334 335 336
      if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
      if type(object) is type(()):
        object_path = object
      else:
        object_path = object.getPhysicalPath()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
337 338 339 340
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'flushing activity %s' % activity.__class__.__name__)
        activity.flush(self, object_path, invoke=invoke, **kw)

341 342 343 344 345 346 347 348 349 350 351 352 353 354
    def start(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
        activity.start(self, **kw)

    def stop(self, **kw):
      global is_initialized
      if not is_initialized: self.initialize()
      for activity in activity_list:
        LOG('CMFActivity: ', 0, 'starting activity %s' % activity.__class__.__name__)
        activity.stop(self, **kw)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
355 356 357
    def invoke(self, message):
      message(self)

358 359
    def newMessage(self, activity, path, active_process, activity_kw, method_id, *args, **kw):
      # Some Security Cheking should be made here XXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
360 361
      global is_initialized
      if not is_initialized: self.initialize()
362
      if not hasattr(self, '_v_activity_buffer'): self._v_activity_buffer = ActivityBuffer()
363
      activity_dict[activity].queueMessage(self, Message(path, active_process, activity_kw, method_id, args, kw))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
364 365 366 367 368 369 370

    def manageInvoke(self, object_path, method_id, REQUEST=None):
      """
        Invokes all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
371
      self.flush(object_path,method_id=method_id,invoke=1)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
372 373 374 375 376 377 378 379 380
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    def manageCancel(self, object_path, method_id, REQUEST=None):
      """
        Cancel all methods for object "object_path"
      """
      if type(object_path) is type(''):
        object_path = tuple(object_path.split('/'))
381
      self.flush(object_path,method_id=method_id,invoke=0)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
382 383 384 385 386 387 388 389
      if REQUEST is not None:
        return REQUEST.RESPONSE.redirect('%s/%s' % (self.absolute_url(), 'manageActivities'))

    security.declarePublic('getMessageList')
    def getMessageList(self):
      """
        List messages waiting in queues
      """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
390 391 392
      # Initialize if needed
      if not is_initialized: self.initialize()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
393 394
      message_list = []
      for activity in activity_list:
Sebastien Robin's avatar
Sebastien Robin committed
395 396 397 398
        try:
          message_list += activity.getMessageList(self)
        except AttributeError:
          LOG('getMessageList, could not get message from Activity:',0,activity)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
399 400
      return message_list

401
    security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
402
    def newActiveProcess(self, **kw):
403 404 405
      from ActiveProcess import addActiveProcess
      new_id = str(self.generateNewId())
      addActiveProcess(self, new_id)
406 407 408
      active_process = self._getOb(new_id)
      active_process.edit(**kw)
      return active_process
409 410 411 412 413

    def reindexObject(self):
      self.immediateReindexObject()

    def getActiveProcess(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
414 415 416
      active_process = getattr(self, '_v_active_process')
      if active_process:
        return self.unrestrictedTraverse(active_process)
417 418 419
      return None


420
InitializeClass(ActivityTool)