Queue.py 12.3 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import cPickle, sys
30
from DateTime import DateTime
31
from zLOG import LOG, WARNING, ERROR
Yoshinori Okuji's avatar
Yoshinori Okuji committed
32
from ZODB.POSException import ConflictError
33 34 35 36 37 38 39
import sha
from cStringIO import StringIO

try:
  from transaction import get as get_transaction
except ImportError:
  pass
Jean-Paul Smets's avatar
Jean-Paul Smets committed
40

41 42 43 44 45 46
# Error values for message validation
EXCEPTION      = -1
VALID          = 0
INVALID_PATH   = 1
INVALID_ORDER  = 2

47
# Time global parameters
48 49
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 30 # in seconds
50

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
def abortTransactionSynchronously():
  """Abort a transaction in a synchronous manner.
  
  Manual invocation of transaction abort does not synchronize
  connections with databases, thus invalidations are not cleared out.
  This may cause an infinite loop, because a read conflict error happens
  again and again on the same object.

  So, in this method, collect (potential) Connection objects used
  for current transaction, and invoke the sync method on every Connection
  object, then abort the transaction. In most cases, aborting the
  transaction is redundant, because sync should call abort implicitly.
  But if no connection is present, it is still required to call abort
  explicitly, and it does not cause any harm to call abort more than once.

  XXX this is really a hack. This touches the internal code of Transaction.
  """
  try:
    import transaction
    # Zope 2.8 and later.
    manager_list = transaction.get()._adapters.keys()
    for manager in manager_list:
      if hasattr(manager, 'sync'):
        manager.sync()
    transaction.abort()
  except ImportError:
    # Zope 2.7 and earlier.
    t = get_transaction()
    jar_list = t._get_jars(t._objects, 0)
    for jar in jar_list:
      if hasattr(jar, 'sync'):
        jar.sync()
    t.abort()

Jean-Paul Smets's avatar
Jean-Paul Smets committed
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
class Queue:
  """
    Step 1: use lists

    Step 2: add some object related dict which prevents calling twice the same method

    Step 3: add some time information for deferred execution

    Step 4: use MySQL as a way to store events (with locks)

    Step 5: use periodic Timer to wakeup Scheduler

    Step 6: add multiple threads on a single Scheduler

    Step 7: add control thread to kill "events which last too long"

    Some data:

    - reindexObject = 50 ms

    - calling a MySQL read = 0.7 ms

    - calling a simple method by HTTP = 30 ms

    - calling a complex method by HTTP = 500 ms

    References:

    http://www.mysql.com/doc/en/InnoDB_locking_reads.html
    http://www.python.org/doc/current/lib/thread-objects.html
    http://www-poleia.lip6.fr/~briot/actalk/actalk.html
  """

  #scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']

  def __init__(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
121 122
    self.is_alive = {}
    self.is_awake = {}
Jean-Paul Smets's avatar
Jean-Paul Smets committed
123
    self.is_initialized = 0
124
    self.max_processing_date = DateTime()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
125 126 127 128 129 130 131 132

  def initialize(self, activity_tool):
    # This is the only moment when
    # we can set some global variables related
    # to the ZODB context
    if not self.is_initialized:
      self.is_initialized = 1

Jean-Paul Smets's avatar
Jean-Paul Smets committed
133
  def queueMessage(self, activity_tool, m):    
134
    activity_tool.deferredQueueMessage(self, m)  
135

136
  def deleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
137 138 139
    if not getattr(m, 'is_deleted', 0):
      # We try not to delete twice
      # However this can not be garanteed in the case of messages loaded from SQL
Jean-Paul Smets's avatar
Jean-Paul Smets committed
140
      activity_tool.deferredDeleteMessage(self, m)  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
141
    m.is_deleted = 1
142

Jean-Paul Smets's avatar
Jean-Paul Smets committed
143
  def dequeueMessage(self, activity_tool, processing_node):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
144 145
    pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
146 147 148 149
  def tic(self, activity_tool, processing_node):
    # Tic should return quickly to prevent locks or commit transactions at some point
    if self.dequeueMessage(activity_tool, processing_node):
      self.sleep(activity_tool, processing_node)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
150

Jean-Paul Smets's avatar
Jean-Paul Smets committed
151 152 153 154 155
  def distribute(self, activity_tool, node_count):
    pass

  def sleep(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
156

Jean-Paul Smets's avatar
Jean-Paul Smets committed
157 158
  def wakeup(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 1
Jean-Paul Smets's avatar
Jean-Paul Smets committed
159

Jean-Paul Smets's avatar
Jean-Paul Smets committed
160 161 162
  def terminate(self, activity_tool, processing_node):
    self.is_awake[processing_node] = 0
    self.is_alive[processing_node] = 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
163

164
  def validate(self, activity_tool, message, check_order_validation=1, **kw):
165 166 167 168
    """
      This is the place where activity semantics is implemented
      **kw contains all parameters which allow to implement synchronisation,
      constraints, delays, etc.
169

170
      Standard synchronisation parameters:
171

172 173 174
      after_method_id   --  never validate message if after_method_id
                            is in the list of methods which are
                            going to be executed
175

176 177 178
      after_message_uid --  never validate message if after_message_uid
                            is in the list of messages which are
                            going to be executed
179

180 181
      after_path        --  never validate message if after_path
                            is in the list of path which are
182
                            going to be executed
183
    """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
184
    try:
185
      if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
186
        # Do not try to call methods on objects which do not exist
187
        LOG('CMFActivity', WARNING,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
188
           'Object %s does not exist' % '/'.join(message.object_path))
189
        return INVALID_PATH
190 191 192 193
      if check_order_validation:
        for k, v in kw.iteritems():
          if activity_tool.validateOrder(message, k, v):
            return INVALID_ORDER
Yoshinori Okuji's avatar
Yoshinori Okuji committed
194 195
    except ConflictError:
      raise
Jean-Paul Smets's avatar
Jean-Paul Smets committed
196
    except:
197
      LOG('CMFActivity', WARNING,
198 199
          'Validation of Object %s raised exception' % '/'.join(message.object_path),
          error=sys.exc_info())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
200
      # Do not try to call methods on objects which cause errors
201 202
      return EXCEPTION
    return VALID
Jean-Paul Smets's avatar
Jean-Paul Smets committed
203

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
  def getDependentMessageList(self, activity_tool, message, **kw):
    message_list = []
    for k, v in kw.iteritems():
      result = activity_tool.getDependentMessageList(message, k, v)
      if result:
        message_list.extend(result)
    return message_list

  def getExecutableMessageList(self, activity_tool, message, message_dict,
                               validation_text_dict):
    """Get messages which have no dependent message, and store them in the dictionary.

    If the passed message itself is executable, simply store only that message.
    Otherwise, try to find at least one message executable from dependent messages.

    This may result in no new message, if all dependent messages are already present
    in the dictionary, if all dependent messages are in different activities, or if
    the message has a circular dependency.

    The validation text dictionary is used only to cache the results of validations,
    in order to reduce the number of SQL queries.
    """
    if message.uid in message_dict:
      # Nothing to do. But detect a circular dependency.
      if message_dict[message.uid] is None:
        LOG('CMFActivity', ERROR,
            'message uid %r has a circular dependency' % (message.uid,))
      return

    cached_result = validation_text_dict.get(message.order_validation_text)
    if cached_result is None:
      message_list = message.getDependentMessageList(self, activity_tool)
      get_transaction().commit() # Release locks.
      if message_list:
        # The result is not empty, so this message is not executable.
        validation_text_dict[message.order_validation_text] = 0
        now_date = DateTime()
        for activity, m in message_list:
          # Note that the messages may contain ones which are already assigned or not
          # executable yet.
          if activity is self and m.processing_node == -1 and m.date <= now_date:
            # Call recursively. Set None as a marker to detect a circular dependency.
            message_dict[message.uid] = None
            try:
              self.getExecutableMessageList(activity_tool, m, message_dict,
                                             validation_text_dict)
            finally:
              del message_dict[message.uid]
      else:
        validation_text_dict[message.order_validation_text] = 1
        message_dict[message.uid] = message
    elif cached_result:
      message_dict[message.uid] = message
    else:
      pass

Jean-Paul Smets's avatar
Jean-Paul Smets committed
260 261
  def isAwake(self, activity_tool, processing_node):
    return self.is_awake[processing_node]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
262

263
  def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
264 265
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
266
  def flush(self, activity_tool, object, **kw):    
Jean-Paul Smets's avatar
Jean-Paul Smets committed
267 268
    pass

269 270 271 272 273 274 275 276
  def start(self, active_process=None):
    # Start queue / activities in queue for given process
    pass

  def stop(self, active_process=None):
    # Stop queue / activities in queue for given process
    pass

277
  def loadMessage(self, s, **kw):
278
    m = cPickle.load(StringIO(s))
279 280
    m.__dict__.update(kw)
    return m
Jean-Paul Smets's avatar
Jean-Paul Smets committed
281 282

  def dumpMessage(self, m):
283 284 285 286 287 288 289 290 291
    return cPickle.dumps(m)

  def getOrderValidationText(self, message):
    # Return an identifier of validators related to ordering.
    order_validation_item_list = []
    key_list = message.activity_kw.keys()
    key_list.sort()
    for key in key_list:
      method_id = "_validate_%s" % key
292
      if getattr(self, method_id, None) is not None:
293 294 295 296 297 298 299 300 301
        order_validation_item_list.append((key, message.activity_kw[key]))
    if len(order_validation_item_list) == 0:
      # When no order validation argument is specified, skip the computation
      # of the checksum for speed. Here, 'none' is used, because this never be
      # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
      # is true in Python. This is important, because dtml-if assumes that an empty
      # string is false, so we must use a non-empty string for this.
      return 'none'
    return sha.new(repr(order_validation_item_list)).hexdigest()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
302

303
  def getMessageList(self, activity_tool, processing_node=None,**kw):
304
    return []
305

Sebastien Robin's avatar
Sebastien Robin committed
306 307 308 309 310
  def countMessage(self, activity_tool,**kw):
    return 0

  def countMessageWithTag(self, activity_tool,value):
    return 0
311

312 313
  # Transaction Management
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
314
    # Called to prepare transaction commit for queued messages
315
    pass
316 317

  def finishQueueMessage(self, activity_tool_path, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
318
    # Called to commit queued messages
319 320
    pass

Sebastien Robin's avatar
Sebastien Robin committed
321
  def prepareDeleteMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
322
    # Called to prepare transaction commit for deleted messages
323
    pass
324 325

  def finishDeleteMessage(self, activity_tool_path, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
326
    # Called to commit deleted messages
327
    pass
328

Jean-Paul Smets's avatar
Jean-Paul Smets committed
329 330
  # Registration Management
  def registerActivityBuffer(self, activity_buffer):
331
    pass
332

Jean-Paul Smets's avatar
Jean-Paul Smets committed
333
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
334 335
    message_list = activity_buffer.getMessageList(self)
    return m in message_list
336

Jean-Paul Smets's avatar
Jean-Paul Smets committed
337
  def registerMessage(self, activity_buffer, activity_tool, m):
338 339
    message_list = activity_buffer.getMessageList(self)
    message_list.append(m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
340
    m.is_registered = 1
341

Jean-Paul Smets's avatar
Jean-Paul Smets committed
342 343
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0
344

Jean-Paul Smets's avatar
Jean-Paul Smets committed
345
  def getRegisteredMessageList(self, activity_buffer, activity_tool):
346 347
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
348 349 350

  # Required for tests (time shift)
  def timeShift(self, activity_tool, delay):
351 352 353
    """
      delay is provided in fractions of day
    """
354
    pass