SQLDict.py 30.6 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
30
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from Products.CMFActivity.ActivityTool import registerActivity
32
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY
Jean-Paul Smets's avatar
Jean-Paul Smets committed
33
from RAMDict import RAMDict
34
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
35
from Products.CMFActivity.Errors import ActivityFlushError
36
from ZODB.POSException import ConflictError
37
import sys
38
import sha
39
from types import ClassType, StringType, ListType, TupleType
Jean-Paul Smets's avatar
Jean-Paul Smets committed
40

41 42 43 44 45
try:
  from transaction import get as get_transaction
except ImportError:
  pass

Yoshinori Okuji's avatar
Yoshinori Okuji committed
46
from zLOG import LOG, TRACE, WARNING, ERROR
Jean-Paul Smets's avatar
Jean-Paul Smets committed
47

48
MAX_PRIORITY = 5
49
MAX_GROUPED_OBJECTS = 500
Jean-Paul Smets's avatar
Jean-Paul Smets committed
50

51 52 53 54 55 56
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1
57

58

Jean-Paul Smets's avatar
Jean-Paul Smets committed
59 60 61 62 63 64
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65
  # Transaction commit methods
66
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
67
    if m.is_registered:
68
      activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69 70
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
71
                                          broadcast = m.activity_kw.get('broadcast', 0),
72
                                          message = self.dumpMessage(m),
73
                                          date = m.activity_kw.get('at_date', DateTime()),
74
                                          group_method_id = m.activity_kw.get('group_method_id', ''),
75 76
                                          tag = m.activity_kw.get('tag', ''),
                                          order_validation_text = self.getOrderValidationText(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
77
                                          # Also store uid of activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78

79 80 81 82 83 84
  def prepareQueueMessageList(self, activity_tool, message_list):
    registered_message_list = []
    for message in message_list:
      if message.is_registered:
        registered_message_list.append(message)
    if len(registered_message_list) > 0:
85
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
86 87 88 89 90
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
91 92 93
      datetime = DateTime()
      date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
      group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list]
94
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
95
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
96 97 98
      uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=len(registered_message_list))
      activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                              path_list = path_list,
99 100 101 102
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              broadcast_list = broadcast_list,
                                              message_list = dumped_message_list,
103
                                              date_list = date_list,
104
                                              group_method_id_list = group_method_id_list,
105 106
                                              tag_list = tag_list,
                                              order_validation_text_list = order_validation_text_list)
107

108 109
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
110
    path = '/'.join(m.object_path)
111 112
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
113
                                                 order_validation_text = order_validation_text,
114
                                                 processing_node = None)
115
    uid_list = [x.uid for x in uid_list]
116
    if len(uid_list)>0:
117 118 119
      activity_tool.SQLDict_delMessage(uid = uid_list)

  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
120
  def registerActivityBuffer(self, activity_buffer):
121
    pass
122

Jean-Paul Smets's avatar
Jean-Paul Smets committed
123
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
124
    uid_set = activity_buffer.getUidSet(self)
125
    return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag')) in uid_set
126

Jean-Paul Smets's avatar
Jean-Paul Smets committed
127 128
  def registerMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 1
129
    uid_set = activity_buffer.getUidSet(self)
130
    uid_set.add((tuple(m.object_path), m.method_id, m.activity_kw.get('tag')))
131 132
    message_list = activity_buffer.getMessageList(self)
    message_list.append(m)
133

Jean-Paul Smets's avatar
Jean-Paul Smets committed
134 135
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
136
    class_name = self.__class__.__name__
137
    uid_set = activity_buffer.getUidSet(self)
138
    uid_set.discard((tuple(m.object_path), m.method_id, m.activity_kw.get('tag')))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
139 140

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
141 142
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
143

144 145 146 147 148 149 150 151 152 153
  def getOrderValidationText(self, message):
    # Return an identifier of validators related to ordering.
    order_validation_item_list = []
    key_list = message.activity_kw.keys()
    key_list.sort()
    for key in key_list:
      method_id = "_validate_%s" % key
      if hasattr(self, method_id):
        order_validation_item_list.append((key, message.activity_kw[key]))
    if len(order_validation_item_list) == 0:
154 155 156 157 158 159
      # When no order validation argument is specified, skip the computation
      # of the checksum for speed. Here, 'none' is used, because this never be
      # identical to SHA1 hexdigest (which is always 40 characters), and 'none'
      # is true in Python. This is important, because dtml-if assumes that an empty
      # string is false, so we must use a non-empty string for this.
      return 'none'
160
    return sha.new(repr(order_validation_item_list)).hexdigest()
161

162
  def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
163 164 165 166 167 168 169
    validation_state = message.validate(self, activity_tool)
    if validation_state is not VALID:
      if validation_state in (EXCEPTION, INVALID_PATH):
        # There is a serious validation error - we must lower priority
        if priority > MAX_PRIORITY:
          # This is an error
          if len(uid_list) > 0:
170
            #LOG('SQLDict', 0, 'error uid_list = %r' % (uid_list,))
171 172 173 174 175
            activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
                                                                            # Assign message back to 'error' state
          #m.notifyUser(activity_tool)                                      # Notify Error
          get_transaction().commit()                                        # and commit
        else:
176
          #LOG('SQLDict', 0, 'lower priority uid_list = %r' % (uid_list,))
177 178
          # Lower priority
          if len(uid_list) > 0: # Add some delay before new processing
179 180
            activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
                                              priority = priority + 1, retry = 1)
181 182 183
          get_transaction().commit() # Release locks before starting a potentially long calculation
      else:
        # We do not lower priority for INVALID_ORDER errors but we do postpone execution
184 185
        #order_validation_text = self.getOrderValidationText(message)
        activity_tool.SQLDict_setPriority(uid = uid_list,
186
                                          delay = VALIDATION_ERROR_DELAY,
187
                                          retry = 1)
188 189 190
        get_transaction().commit() # Release locks before starting a potentially long calculation
      return 0
    return 1
191

Jean-Paul Smets's avatar
Jean-Paul Smets committed
192
  # Queue semantic
Jean-Paul Smets's avatar
Jean-Paul Smets committed
193
  def dequeueMessage(self, activity_tool, processing_node):
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
    readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
    if readMessage is None:
      return 1

    now_date = DateTime()
    priority = random.choice(priority_weight)
    # Try to find a message at given priority level which is scheduled for now
    result = readMessage(processing_node=processing_node, priority=priority,
                         to_date=now_date)
    if len(result) == 0:
      # If empty, take any message which is scheduled for now
      priority = None
      result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
    if len(result) == 0:
      # If the result is still empty, shift the dates so that SQLDict can dispatch pending active
      # objects quickly.
      self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
211
    else:
212 213 214 215 216 217
      #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
      line = result[0]
      path = line.path
      method_id = line.method_id
      order_validation_text = line.order_validation_text
      uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
218 219
                                                   processing_node = None, to_date = now_date,
                                                   order_validation_text = order_validation_text)
220 221 222 223 224 225 226 227 228
      uid_list = [x.uid for x in uid_list]
      uid_list_list = [uid_list]
      priority_list = [line.priority]
      # Make sure message can not be processed anylonger
      if len(uid_list) > 0:
        # Set selected messages to processing
        activity_tool.SQLDict_processMessage(uid = uid_list)
      get_transaction().commit() # Release locks before starting a potentially long calculation
      # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
229 230 231 232 233 234 235 236 237 238

      # At this point, messages are marked as processed. So catch any kind of exception to make sure
      # that they are unmarked on error.
      try:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list = [m]
        # Validate message (make sure object exists, priority OK, etc.)
        if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
          return 0

239 240 241 242
        group_method_id = m.activity_kw.get('group_method_id')
        if group_method_id is not None:
          # Count the number of objects to prevent too many objects.
          if m.hasExpandMethod():
243
            count = len(m.getObjectList(activity_tool))
244
          else:
245 246 247 248 249 250 251
            count = 1

          group_method = activity_tool.restrictedTraverse(group_method_id)

          if count < MAX_GROUPED_OBJECTS:
            # Retrieve objects which have the same group method.
            result = readMessage(processing_node = processing_node, priority = priority,
252 253
                                to_date = now_date, group_method_id = group_method_id,
                                order_validation_text = order_validation_text)
254
            #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
255
            path_and_method_id_dict = {}
256 257 258
            for line in result:
              path = line.path
              method_id = line.method_id
259 260 261 262 263 264 265

              # Prevent using the same pair of a path and a method id.
              key = (path, method_id)
              if key in path_and_method_id_dict:
                continue
              path_and_method_id_dict[key] = 1

266 267 268 269 270 271 272 273
              uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
                                                            processing_node = None, to_date = now_date,
                                                            order_validation_text = order_validation_text)
              uid_list = [x.uid for x in uid_list]
              if len(uid_list) > 0:
                # Set selected messages to processing
                activity_tool.SQLDict_processMessage(uid = uid_list)
              get_transaction().commit() # Release locks before starting a potentially long calculation
274 275 276 277

              # Save this newly marked uids as soon as possible.
              uid_list_list.append(uid_list)

278 279 280
              m = self.loadMessage(line.message, uid = line.uid)
              if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
                if m.hasExpandMethod():
281
                  count += len(m.getObjectList(activity_tool))
282 283 284 285 286 287
                else:
                  count += 1
                message_list.append(m)
                priority_list.append(line.priority)
                if count >= MAX_GROUPED_OBJECTS:
                  break
288 289 290 291
              else:
                # If the uids were not valid, remove them from the list, as validateMessage
                # unmarked them.
                uid_list_list.pop()
292

293 294
          # Release locks before starting a potentially long calculation
          get_transaction().commit()
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317

        # Try to invoke
        if group_method_id is not None:
          LOG('SQLDict', TRACE,
              'invoking a group method %s with %d objects '\
              ' (%d objects in expanded form)' % (
              group_method_id, len(message_list), count))
          activity_tool.invokeGroup(group_method_id, message_list)
        else:
          activity_tool.invoke(message_list[0])

        # Check if messages are executed successfully.
        # When some of them are executed successfully, it may not be acceptable to
        # abort the transaction, because these remain pending, only due to other
        # invalid messages. This means that a group method should not be used if
        # it has a side effect. For now, only indexing uses a group method, and this
        # has no side effect.
        for m in message_list:
          if m.is_executed:
            get_transaction().commit()
            break
        else:
          get_transaction().abort()
318
      except:
319 320 321 322 323 324 325
        # If an exception occurs, abort the transaction to minimize the impact,
        try:
          get_transaction().abort()
        except:
          # Unfortunately, database adapters may raise an exception against abort.
          LOG('SQLDict', WARNING, 'abort failed, thus some objects may be modified accidentally')
          pass
326 327 328

        # An exception happens at somewhere else but invoke or invokeGroup, so messages
        # themselves should not be delayed.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
329
        try:
330 331 332 333 334
          for uid_list in uid_list_list:
            if len(uid_list):
              # This only sets processing to zero.
              activity_tool.SQLDict_setPriority(uid = uid_list)
              get_transaction().commit()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
335 336 337 338
        except:
          LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
              error=sys.exc_info())
          raise
339
        return 0
340

Yoshinori Okuji's avatar
Yoshinori Okuji committed
341 342 343 344 345 346
      try:
        for i in xrange(len(message_list)):
          m = message_list[i]
          uid_list = uid_list_list[i]
          priority = priority_list[i]
          if m.is_executed:
347
            if len(uid_list) > 0:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
348 349 350 351 352 353 354
              activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
            get_transaction().commit()                                        # If successful, commit
            if m.active_process:
              active_process = activity_tool.unrestrictedTraverse(m.active_process)
              if not active_process.hasActivity():
                # No more activity
                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
355
          else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
356 357
            if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
              # If this is a conflict error, do not lower the priority but only delay.
358
              activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
Yoshinori Okuji's avatar
Yoshinori Okuji committed
359
                                                retry = 1)
360
              get_transaction().commit() # Release locks before starting a potentially long calculation
Yoshinori Okuji's avatar
Yoshinori Okuji committed
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
            elif priority > MAX_PRIORITY:
              # This is an error
              if len(uid_list) > 0:
                activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
                                                                                # Assign message back to 'error' state
              m.notifyUser(activity_tool)                                       # Notify Error
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              if len(uid_list) > 0:
                activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
                                                  priority = priority + 1, retry = 1)
                get_transaction().commit() # Release locks before starting a potentially long calculation
      except:
        LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
            error=sys.exc_info())
        raise
378 379 380

      return 0
    get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
381 382
    return 1

383
  def hasActivity(self, activity_tool, object, **kw):
384 385
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
386 387
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
388
        result = hasMessage(path=my_object_path, **kw)
389 390 391 392
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
393 394
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
395
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
396 397
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
398 399 400 401 402 403

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
404 405

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
406 407
    """
    path = '/'.join(object_path)
408
    # LOG('Flush', 0, str((path, invoke, method_id)))
409
    method_dict = {}
410 411
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
412 413 414
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
415
          #if not method_dict.has_key(method_id or m.method_id):
416 417
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
418 419
            if invoke:
              # First Validate
420 421
              validate_value = m.validate(self, activity_tool)
              if validate_value is VALID:
422 423 424 425
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
                if not m.is_executed:                                                 # Make sure message could be invoked
                  # The message no longer exists
                  raise ActivityFlushError, (
426
                      'Could not evaluate %s on %s' % (m.method_id , path))
427
              elif validate_value is INVALID_PATH:
428 429
                # The message no longer exists
                raise ActivityFlushError, (
430
                    'The document %s does not exist' % path)
431 432 433 434
              else:
                raise ActivityFlushError, (
                    'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
435
      # Parse each message in SQL dict
436 437
      result = readMessageList(path=path, method_id=method_id,
                               processing_node=None,include_processing=0)
438 439
      for line in result:
        path = line.path
440 441
        line_method_id = line.method_id
        if not method_dict.has_key(line_method_id):
442
          # Only invoke once (it would be different for a queue)
443 444
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
445
          method_dict[line_method_id] = 1
446
          m = self.loadMessage(line.message, uid = line.uid)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
447 448
          if invoke:
            # First Validate
449
            validate_value = m.validate(self, activity_tool)
Romain Courteaud's avatar
Romain Courteaud committed
450
#             LOG('SQLDict.flush validate_value',0,validate_value)
451
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
452
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
Romain Courteaud's avatar
Romain Courteaud committed
453
#               LOG('SQLDict.flush m.is_executed',0,m.is_executed)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
454 455 456
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
457
                    'Could not evaluate %s on %s' % (m.method_id , path))
458
            elif validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
459 460
              # The message no longer exists
              raise ActivityFlushError, (
461
                  'The document %s does not exist' % path)
462 463 464
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
465 466

      if len(result):
467
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
468
                                                     processing_node = None,)
469 470
        if len(uid_list)>0:
          activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
471

472
  def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
473
    # YO: reading all lines might cause a deadlock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
474
    message_list = []
475 476 477 478
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node=None,
                               to_processing_date=None,include_processing=include_processing)
479 480 481 482
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        m.processing_node = line.processing_node
        m.priority = line.priority
483
        m.processing = line.processing
484
        message_list.append(m)
485 486
    return message_list

487 488 489
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
490 491 492
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
493 494 495 496 497
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
498 499
  def distribute(self, activity_tool, node_count):
    processing_node = 1
500 501
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
502 503 504 505 506 507
      now_date = DateTime()
      if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
        # Sticky processing messages should be set back to non processing
        max_processing_date = now_date - MAX_PROCESSING_TIME
        self.max_processing_date = now_date
      else:
508
        max_processing_date = None
509 510 511
      result = readMessageList(path=None, method_id=None, processing_node = -1,
                               to_processing_date = max_processing_date,
                               include_processing=0) # Only assign non assigned messages
512 513 514 515
      get_transaction().commit() # Release locks before starting a potentially long calculation
      path_dict = {}
      for line in result:
        path = line.path
516 517 518 519 520
        broadcast = line.broadcast
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
          uid = line.uid
          activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
521
          if node_count > 1:
522
            uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=node_count - 1)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
523
            for node in range(2, node_count+1):
524 525
              activity_tool.SQLDict_writeMessage( uid = uid_list.pop(),
                                                  path = path,
Yoshinori Okuji's avatar
Yoshinori Okuji committed
526 527 528 529 530 531
                                                  method_id = line.method_id,
                                                  priority = line.priority,
                                                  broadcast = 1,
                                                  processing_node = node,
                                                  message = line.message,
                                                  date = line.date)
532
        elif not path_dict.has_key(path):
533 534
          # Only assign once (it would be different for a queue)
          path_dict[path] = 1
535
          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
536 537 538 539
          get_transaction().commit() # Release locks immediately to allow processing of messages
          processing_node = processing_node + 1
          if processing_node > node_count:
            processing_node = 1 # Round robin
Jean-Paul Smets's avatar
Jean-Paul Smets committed
540

541 542 543
  # Validation private methods
  def _validate_after_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id
544
    if type(value) is StringType:
545
      value = [value]
546 547 548 549
    if len(value)>0: # if empty list provided, the message is valid
      result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
      if result[0].uid_count > 0:
        return INVALID_ORDER
550
    return VALID
551

552 553
  def _validate_after_path(self, activity_tool, message, value):
    # Count number of occurances of path
554
    if type(value) is StringType:
555
      value = [value]
556 557 558 559
    if len(value)>0: # if empty list provided, the message is valid
      result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
      if result[0].uid_count > 0:
        return INVALID_ORDER
560
    return VALID
561

562 563 564 565 566 567 568
  def _validate_after_message_uid(self, activity_tool, message, value):
    # Count number of occurances of message_uid
    result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

569 570
  def _validate_after_path_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of path and method_id
571
    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
572 573 574 575
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method_id : %s' % repr(value))
      return VALID
    path = value[0]
    method = value[1]
576
    if type(path) is StringType:
577
      path = [path]
578
    if type(method) is StringType:
579 580 581 582 583 584
      method = [method]
    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, path=path)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

585 586
  def _validate_after_tag(self, activity_tool, message, value):
    # Count number of occurances of tag
587
    if self.countMessageWithTag(activity_tool, value) > 0:
588 589
      return INVALID_ORDER
    return VALID
590

Sebastien Robin's avatar
Sebastien Robin committed
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
  def countMessage(self, activity_tool, tag=None,path=None,
                   method_id=None,message_uid=None,**kw):
    """
      Return the number of message which match the given parameter.
    """
    if isinstance(tag, StringType):
      tag = [tag]
    if isinstance(path, StringType):
      path = [path]
    if isinstance(message_uid, (int,long)):
      message_uid = [message_uid]
    if isinstance(method_id, StringType):
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
                                                       tag=tag)
    return result[0].uid_count

610 611 612 613
  def countMessageWithTag(self, activity_tool, value):
    """
      Return the number of message which match the given tag.
    """
Sebastien Robin's avatar
Sebastien Robin committed
614
    return self.countMessage(activity_tool,tag=value)
615

616 617
  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
618
    if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
619 620 621 622
      LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
      return VALID
    tag = value[0]
    method = value[1]
623
    if type(tag) is StringType:
624
      tag = [tag]
625
    if type(method) is StringType:
626 627 628 629 630 631
      method = [method]
    result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, tag=tag)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

632
  # Required for tests (time shift)
633
  def timeShift(self, activity_tool, delay, processing_node=None,retry=None):
634 635 636 637
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
638
    activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node,retry=retry)
639

Jean-Paul Smets's avatar
Jean-Paul Smets committed
640
registerActivity(SQLDict)