SQLDict.py 29.8 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
30
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from Products.CMFActivity.ActivityTool import registerActivity
32
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
Jean-Paul Smets's avatar
Jean-Paul Smets committed
33
from RAMDict import RAMDict
34
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
35
from ZODB.POSException import ConflictError
36
import sys
37
import sha
38
from types import ClassType, StringType, ListType, TupleType
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44
try:
  from transaction import get as get_transaction
except ImportError:
  pass

45
from zLOG import LOG, TRACE, WARNING
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46

47
MAX_PRIORITY = 5
48
MAX_GROUPED_OBJECTS = 500
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49

50 51 52 53 54 55
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1
56

57 58 59
class ActivityFlushError(Exception):
    """Error during active message flush"""

Jean-Paul Smets's avatar
Jean-Paul Smets committed
60 61 62 63 64 65
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
66
  # Transaction commit methods
67
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
68
    if m.is_registered:
69
      activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
70 71
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
72
                                          broadcast = m.activity_kw.get('broadcast', 0),
73
                                          message = self.dumpMessage(m),
74
                                          date = m.activity_kw.get('at_date', DateTime()),
75
                                          group_method_id = m.activity_kw.get('group_method_id', ''),
76 77
                                          tag = m.activity_kw.get('tag', ''),
                                          order_validation_text = self.getOrderValidationText(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78
                                          # Also store uid of activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80 81 82 83 84 85
  def prepareQueueMessageList(self, activity_tool, message_list):
    registered_message_list = []
    for message in message_list:
      if message.is_registered:
        registered_message_list.append(message)
    if len(registered_message_list) > 0:
86
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
87 88 89 90 91
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
92 93 94
      datetime = DateTime()
      date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
      group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list]
95
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
96
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
97 98 99 100 101
      activity_tool.SQLDict_writeMessageList( path_list = path_list,
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              broadcast_list = broadcast_list,
                                              message_list = dumped_message_list,
102
                                              date_list = date_list,
103
                                              group_method_id_list = group_method_id_list,
104 105
                                              tag_list = tag_list,
                                              order_validation_text_list = order_validation_text_list)
106

107 108
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
109
    path = '/'.join(m.object_path)
110 111 112 113
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
                                                 order_validation_text = order_validation_text, 
                                                 processing_node = None)
114
    uid_list = [x.uid for x in uid_list]
115
    if len(uid_list)>0:
116 117 118
      activity_tool.SQLDict_delMessage(uid = uid_list)

  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
119
  def registerActivityBuffer(self, activity_buffer):
120
    class_name = self.__class__.__name__
121 122 123
    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
    setattr(activity_buffer, '_%s_message_list' % class_name, [])

Jean-Paul Smets's avatar
Jean-Paul Smets committed
124
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
125 126 127
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    return uid_dict.has_key((tuple(m.object_path), m.method_id))
128

Jean-Paul Smets's avatar
Jean-Paul Smets committed
129 130
  def registerMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 1
131 132 133 134
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    uid_dict[(tuple(m.object_path), m.method_id)] = 1
    getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
135

Jean-Paul Smets's avatar
Jean-Paul Smets committed
136 137
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
138 139 140 141
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    if uid_dict.has_key((tuple(m.object_path), m.method_id)):
      del uid_dict[(tuple(m.object_path), m.method_id)]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
142 143

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
144 145 146
    class_name = self.__class__.__name__
    if hasattr(activity_buffer,'_%s_message_list' % class_name):
      message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
147
      return [m for m in message_list if m.is_registered]
148 149
    else:
      return ()
150

151 152 153 154 155 156 157 158 159 160
  def getOrderValidationText(self, message):
    # Return an identifier of validators related to ordering.
    order_validation_item_list = []
    key_list = message.activity_kw.keys()
    key_list.sort()
    for key in key_list:
      method_id = "_validate_%s" % key
      if hasattr(self, method_id):
        order_validation_item_list.append((key, message.activity_kw[key]))
    if len(order_validation_item_list) == 0:
161 162 163 164 165 166
      # When no order validation argument is specified, skip the computation
      # of the checksum for speed. Here, 'none' is used, because this never be
      # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
      # is true in Python. This is important, because dtml-if assumes that an empty
      # string is false, so we must use a non-empty string for this.
      return 'none'
167
    return sha.new(repr(order_validation_item_list)).hexdigest()
168

169
  def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
170 171 172 173 174 175 176 177 178 179 180 181 182 183
    validation_state = message.validate(self, activity_tool)
    if validation_state is not VALID:
      if validation_state in (EXCEPTION, INVALID_PATH):
        # There is a serious validation error - we must lower priority
        if priority > MAX_PRIORITY:
          # This is an error
          if len(uid_list) > 0:
            activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
                                                                            # Assign message back to 'error' state
          #m.notifyUser(activity_tool)                                      # Notify Error
          get_transaction().commit()                                        # and commit
        else:
          # Lower priority
          if len(uid_list) > 0: # Add some delay before new processing
184 185
            activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
                                              priority = priority + 1, retry = 1)
186 187 188
          get_transaction().commit() # Release locks before starting a potentially long calculation
      else:
        # We do not lower priority for INVALID_ORDER errors but we do postpone execution
189 190 191 192 193 194
        order_validation_text = self.getOrderValidationText(message)
        activity_tool.SQLDict_setPriority(order_validation_text = order_validation_text, 
                                          processing_node = processing_node,
                                          delay = VALIDATION_ERROR_DELAY,
                                          retry = 1,
                                          uid = None)
195 196 197
        get_transaction().commit() # Release locks before starting a potentially long calculation
      return 0
    return 1
198

Jean-Paul Smets's avatar
Jean-Paul Smets committed
199
  # Queue semantic
Jean-Paul Smets's avatar
Jean-Paul Smets committed
200
  def dequeueMessage(self, activity_tool, processing_node):
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
    readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
    if readMessage is None:
      return 1

    now_date = DateTime()
    priority = random.choice(priority_weight)
    # Try to find a message at given priority level which is scheduled for now
    result = readMessage(processing_node=processing_node, priority=priority,
                         to_date=now_date)
    if len(result) == 0:
      # If empty, take any message which is scheduled for now
      priority = None
      result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
    if len(result) == 0:
      # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
      # objects quickly.
      self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
    elif len(result) > 0:
      #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
      line = result[0]
      path = line.path
      method_id = line.method_id
      order_validation_text = line.order_validation_text
      uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
225 226
                                                   processing_node = None, to_date = now_date,
                                                   order_validation_text = order_validation_text)
227 228 229 230 231 232 233 234 235
      uid_list = [x.uid for x in uid_list]
      uid_list_list = [uid_list]
      priority_list = [line.priority]
      # Make sure message can not be processed anylonger
      if len(uid_list) > 0:
        # Set selected messages to processing
        activity_tool.SQLDict_processMessage(uid = uid_list)
      get_transaction().commit() # Release locks before starting a potentially long calculation
      # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
236 237 238 239 240 241 242 243 244 245

      # At this point, messages are marked as processed. So catch any kind of exception to make sure
      # that they are unmarked on error.
      try:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list = [m]
        # Validate message (make sure object exists, priority OK, etc.)
        if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
          return 0

246 247 248 249
        group_method_id = m.activity_kw.get('group_method_id')
        if group_method_id is not None:
          # Count the number of objects to prevent too many objects.
          if m.hasExpandMethod():
250
            count = len(m.getObjectList(activity_tool))
251
          else:
252 253 254 255 256 257 258
            count = 1

          group_method = activity_tool.restrictedTraverse(group_method_id)

          if count < MAX_GROUPED_OBJECTS:
            # Retrieve objects which have the same group method.
            result = readMessage(processing_node = processing_node, priority = priority,
259 260
                                to_date = now_date, group_method_id = group_method_id,
                                order_validation_text = order_validation_text)
261 262 263 264 265 266 267 268 269 270 271 272
            #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
            for line in result:
              path = line.path
              method_id = line.method_id
              uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
                                                            processing_node = None, to_date = now_date,
                                                            order_validation_text = order_validation_text)
              uid_list = [x.uid for x in uid_list]
              if len(uid_list) > 0:
                # Set selected messages to processing
                activity_tool.SQLDict_processMessage(uid = uid_list)
              get_transaction().commit() # Release locks before starting a potentially long calculation
273 274 275 276

              # Save this newly marked uids as soon as possible.
              uid_list_list.append(uid_list)

277 278 279
              m = self.loadMessage(line.message, uid = line.uid)
              if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
                if m.hasExpandMethod():
280
                  count += len(m.getObjectList(activity_tool))
281 282 283 284 285 286
                else:
                  count += 1
                message_list.append(m)
                priority_list.append(line.priority)
                if count >= MAX_GROUPED_OBJECTS:
                  break
287 288 289 290
              else:
                # If the uids were not valid, remove them from the list, as validateMessage
                # unmarked them.
                uid_list_list.pop()
291

292 293
          # Release locks before starting a potentially long calculation
          get_transaction().commit()
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316

        # Try to invoke
        if group_method_id is not None:
          LOG('SQLDict', TRACE,
              'invoking a group method %s with %d objects '\
              ' (%d objects in expanded form)' % (
              group_method_id, len(message_list), count))
          activity_tool.invokeGroup(group_method_id, message_list)
        else:
          activity_tool.invoke(message_list[0])

        # Check if messages are executed successfully.
        # When some of them are executed successfully, it may not be acceptable to
        # abort the transaction, because these remain pending, only due to other
        # invalid messages. This means that a group method should not be used if
        # it has a side effect. For now, only indexing uses a group method, and this
        # has no side effect.
        for m in message_list:
          if m.is_executed:
            get_transaction().commit()
            break
        else:
          get_transaction().abort()
317
      except:
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
        # If an exception occurs, abort the transaction to minimize the impact,
        try:
          get_transaction().abort()
        except:
          # Unfortunately, database adapters may raise an exception against abort.
          LOG('SQLDict', WARNING, 'abort failed, thus some objects may be modified accidentally')
          pass
        if issubclass(sys.exc_info()[0], ConflictError):
          # For a conflict error, simply delay the operations.
          for uid_list in uid_list_list:
            if len(uid_list):
              activity_tool.SQLDict_setPriority(uid = uid_list, 
                                                delay = VALIDATION_ERROR_DELAY,
                                                retry = 1)
        else:
          # For other exceptions, put the messages to an invalid state immediately.
          for uid_list in uid_list_list:
            if len(uid_list):
              activity_tool.SQLDict_assignMessage(uid = uid_list, 
                                                  processing_node = INVOKE_ERROR_STATE)
338 339
        get_transaction().commit()
        return 0
340

341 342 343 344 345
      for i in xrange(len(message_list)):
        m = message_list[i]
        uid_list = uid_list_list[i]
        priority = priority_list[i]
        if m.is_executed:
346 347
          if len(uid_list) > 0:
            activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
          get_transaction().commit()                                        # If successful, commit
          if m.active_process:
            active_process = activity_tool.unrestrictedTraverse(m.active_process)
            if not active_process.hasActivity():
              # No more activity
              m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
        else:
          if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
            # If this is a conflict error, do not lower the priority but only delay.
            activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
                                              retry = 1)
            get_transaction().commit() # Release locks before starting a potentially long calculation
          elif priority > MAX_PRIORITY:
            # This is an error
            if len(uid_list) > 0:
              activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
                                                                              # Assign message back to 'error' state
            m.notifyUser(activity_tool)                                       # Notify Error
            get_transaction().commit()                                        # and commit
367
          else:
368 369
            # Lower priority
            if len(uid_list) > 0:
370
              activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
371
                                                priority = priority + 1, retry = 1)
372 373 374 375
              get_transaction().commit() # Release locks before starting a potentially long calculation

      return 0
    get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
376 377
    return 1

378
  def hasActivity(self, activity_tool, object, **kw):
379 380
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
381 382
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
383
        result = hasMessage(path=my_object_path, **kw)
384 385 386 387
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
388 389
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
390
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
391 392
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
393 394 395 396 397 398

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
399 400

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
401 402
    """
    path = '/'.join(object_path)
403
    # LOG('Flush', 0, str((path, invoke, method_id)))
404
    method_dict = {}
405 406
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
407 408 409 410
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
          activity_tool.unregisterMessage(self, m)
411
          #if not method_dict.has_key(method_id or m.method_id):
412 413
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
414 415
            if invoke:
              # First Validate
416 417
              validate_value = m.validate(self, activity_tool)
              if validate_value is VALID:
418 419 420 421
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
                if not m.is_executed:                                                 # Make sure message could be invoked
                  # The message no longer exists
                  raise ActivityFlushError, (
422
                      'Could not evaluate %s on %s' % (m.method_id , path))
423
              elif validate_value is INVALID_PATH:
424 425
                # The message no longer exists
                raise ActivityFlushError, (
426
                    'The document %s does not exist' % path)
427
      # Parse each message in SQL dict
428 429
      result = readMessageList(path=path, method_id=method_id,
                               processing_node=None,include_processing=0)
430 431 432
      for line in result:
        path = line.path
        method_id = line.method_id
Jean-Paul Smets's avatar
Jean-Paul Smets committed
433
        if not method_dict.has_key(method_id):
434
          # Only invoke once (it would be different for a queue)
435 436
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
437 438 439
          method_dict[method_id] = 1
          m = self.loadMessage(line.message, uid = line.uid)
          self.deleteMessage(activity_tool, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
440 441
          if invoke:
            # First Validate
442
            validate_value = m.validate(self, activity_tool)
Romain Courteaud's avatar
Romain Courteaud committed
443
#             LOG('SQLDict.flush validate_value',0,validate_value)
444
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
445
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
Romain Courteaud's avatar
Romain Courteaud committed
446
#               LOG('SQLDict.flush m.is_executed',0,m.is_executed)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
447 448 449
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
450
                    'Could not evaluate %s on %s' % (m.method_id , path))
451
            if validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
452 453
              # The message no longer exists
              raise ActivityFlushError, (
454
                  'The document %s does not exist' % path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
455

456
  def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
457
    # YO: reading all lines might cause a deadlock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
458
    message_list = []
459 460 461 462
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node=None,
                               to_processing_date=None,include_processing=include_processing)
463 464 465 466
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        m.processing_node = line.processing_node
        m.priority = line.priority
467
        m.processing = line.processing
468
        message_list.append(m)
469 470
    return message_list

471 472 473
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
474 475 476
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
477 478 479 480 481
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
482 483
  def distribute(self, activity_tool, node_count):
    processing_node = 1
484 485
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
486 487 488 489 490 491
      now_date = DateTime()
      if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
        # Sticky processing messages should be set back to non processing
        max_processing_date = now_date - MAX_PROCESSING_TIME
        self.max_processing_date = now_date
      else:
492
        max_processing_date = None
493 494 495
      result = readMessageList(path=None, method_id=None, processing_node = -1,
                               to_processing_date = max_processing_date,
                               include_processing=0) # Only assign non assigned messages
496 497 498 499
      get_transaction().commit() # Release locks before starting a potentially long calculation
      path_dict = {}
      for line in result:
        path = line.path
500 501 502 503 504
        broadcast = line.broadcast
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
          uid = line.uid
          activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
505 506 507 508 509 510 511 512 513
          if node_count > 1:
            for node in range(2, node_count+1):
              activity_tool.SQLDict_writeMessage( path = path,
                                                  method_id = line.method_id,
                                                  priority = line.priority,
                                                  broadcast = 1,
                                                  processing_node = node,
                                                  message = line.message,
                                                  date = line.date)
514
        elif not path_dict.has_key(path):
515 516
          # Only assign once (it would be different for a queue)
          path_dict[path] = 1
517
          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
518 519 520 521
          get_transaction().commit() # Release locks immediately to allow processing of messages
          processing_node = processing_node + 1
          if processing_node > node_count:
            processing_node = 1 # Round robin
Jean-Paul Smets's avatar
Jean-Paul Smets committed
522

523 524 525
  # Validation private methods
  def _validate_after_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id
526
    if type(value) is StringType:
527
      value = [value]
528 529 530 531
    if len(value)>0: # if empty list provided, the message is valid
      result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
      if result[0].uid_count > 0:
        return INVALID_ORDER
532
    return VALID
533

534 535
  def _validate_after_path(self, activity_tool, message, value):
    # Count number of occurances of path
536
    if type(value) is StringType:
537
      value = [value]
538 539 540 541
    if len(value)>0: # if empty list provided, the message is valid
      result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
      if result[0].uid_count > 0:
        return INVALID_ORDER
542
    return VALID
543

544 545 546 547 548 549 550
  def _validate_after_message_uid(self, activity_tool, message, value):
    # Count number of occurances of message_uid
    result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

551 552
  def _validate_after_path_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of path and method_id
553
    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
554 555 556 557
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method_id : %s' % repr(value))
      return VALID
    path = value[0]
    method = value[1]
558
    if type(path) is StringType:
559
      path = [path]
560
    if type(method) is StringType:
561 562 563 564 565 566
      method = [method]
    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, path=path)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

567 568
  def _validate_after_tag(self, activity_tool, message, value):
    # Count number of occurances of tag
569
    if self.countMessageWithTag(activity_tool, value) > 0:
570 571
      return INVALID_ORDER
    return VALID
572

Sebastien Robin's avatar
Sebastien Robin committed
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
  def countMessage(self, activity_tool, tag=None,path=None,
                   method_id=None,message_uid=None,**kw):
    """
      Return the number of message which match the given parameter.
    """
    if isinstance(tag, StringType):
      tag = [tag]
    if isinstance(path, StringType):
      path = [path]
    if isinstance(message_uid, (int,long)):
      message_uid = [message_uid]
    if isinstance(method_id, StringType):
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
                                                       tag=tag)
    return result[0].uid_count

592 593 594 595
  def countMessageWithTag(self, activity_tool, value):
    """
      Return the number of message which match the given tag.
    """
Sebastien Robin's avatar
Sebastien Robin committed
596
    return self.countMessage(activity_tool,tag=value)
597

598 599
  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
600
    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
601 602 603 604
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
      return VALID
    tag = value[0]
    method = value[1]
605
    if type(tag) is StringType:
606
      tag = [tag]
607
    if type(method) is StringType:
608 609 610 611 612 613
      method = [method]
    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, tag=tag)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

614
  # Required for tests (time shift)
615
  def timeShift(self, activity_tool, delay, processing_node=None,retry=None):
616 617 618 619
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
620
    activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node,retry=retry)
621

Jean-Paul Smets's avatar
Jean-Paul Smets committed
622
registerActivity(SQLDict)