SQLDict.py 29.7 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
30
from Queue import VALID, INVALID_PATH, abortTransactionSynchronously
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from RAMDict import RAMDict
32
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
33
from Products.CMFActivity.Errors import ActivityFlushError
34
from ZODB.POSException import ConflictError
35
import sys
36
from types import ClassType
37
#from time import time
38
from SQLBase import SQLBase, sort_message_key
39 40
from Products.CMFActivity.ActivityRuntimeEnvironment import (
  ActivityRuntimeEnvironment, getTransactionalVariable)
41
from zExceptions import ExceptionFormatter
Jean-Paul Smets's avatar
Jean-Paul Smets committed
42

43 44 45 46 47
try:
  from transaction import get as get_transaction
except ImportError:
  pass

48
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49

50 51
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52
# Read up to this number of messages to validate.
53
READ_MESSAGE_LIMIT = 1000
Yoshinori Okuji's avatar
Yoshinori Okuji committed
54 55
# Stop electing more messages for processing if more than this number of
# objects are impacted by elected messages.
56
MAX_GROUPED_OBJECTS = 100
Jean-Paul Smets's avatar
Jean-Paul Smets committed
57

58 59
MAX_MESSAGE_LIST_SIZE = 100

60
class SQLDict(RAMDict, SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
61 62 63 64 65
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
66 67
  sql_table = 'message'

Jean-Paul Smets's avatar
Jean-Paul Smets committed
68
  # Transaction commit methods
69
  def prepareQueueMessageList(self, activity_tool, message_list):
70 71 72
    message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(message_list), MAX_MESSAGE_LIST_SIZE):
      registered_message_list = message_list[i:i + MAX_MESSAGE_LIST_SIZE]
73
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
74
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
75
      active_process_uid_list = [message.active_process_uid for message in registered_message_list]
76 77 78
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
79
      date_list = [message.activity_kw.get('at_date', None) for message in registered_message_list]
80 81
      group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
                              for message in registered_message_list]
82
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
83
      serialization_tag_list = [message.activity_kw.get('serialization_tag', '') for message in registered_message_list]
84
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
85 86 87 88
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.\
                                           generateNewIdList(id_generator='uid', id_group='portal_activity',
                                           id_count=len(registered_message_list))
89 90
      activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                              path_list = path_list,
91
                                              active_process_uid_list=active_process_uid_list,
92 93 94
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              message_list = dumped_message_list,
95
                                              date_list = date_list,
96
                                              group_method_id_list = group_method_id_list,
97
                                              tag_list = tag_list,
98
                                              serialization_tag_list = serialization_tag_list,
99
                                              processing_node_list=None,
100
                                              order_validation_text_list = order_validation_text_list)
101

102 103
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104
    path = '/'.join(m.object_path)
105 106
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
107
                                                 order_validation_text = order_validation_text)
108
    uid_list = [x.uid for x in uid_list]
109
    if len(uid_list)>0:
110
      activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
111

112 113 114 115 116 117 118 119
  def finishQueueMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

  def finishDeleteMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

120
  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
121
  def registerActivityBuffer(self, activity_buffer):
122
    pass
123

124 125
  def generateMessageUID(self, m):
    return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
126

Jean-Paul Smets's avatar
Jean-Paul Smets committed
127 128
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
129
    class_name = self.__class__.__name__
130
    uid_set = activity_buffer.getUidSet(self)
131
    uid_set.discard(self.generateMessageUID(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
132 133

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
134 135
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
136

137
  def getReservedMessageList(self, activity_tool, date, processing_node, limit=None, group_method_id=None):
138 139 140 141 142 143 144 145 146 147 148
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
        This number is not garanted to be reached, because of:
         - not enough messages being pending execution
         - race condition (other nodes reserving the same messages at the same
           time)
        This number is guaranted not to be exceeded.
        If None (or not given) no limit apply.
    """
149 150 151 152
    result = not group_method_id and \
      activity_tool.SQLDict_selectReservedMessageList(
        processing_node=processing_node, count=limit)
    if not result:
153
      activity_tool.SQLDict_reserveMessageList(count=limit, processing_node=processing_node, to_date=date, group_method_id=group_method_id)
154
      result = activity_tool.SQLDict_selectReservedMessageList(processing_node=processing_node, count=limit)
155 156 157 158 159 160 161 162 163
    return result

  def makeMessageListAvailable(self, activity_tool, uid_list):
    """
      Put messages back in processing_node=0 .
    """
    if len(uid_list):
      activity_tool.SQLDict_makeMessageListAvailable(uid_list=uid_list)

164
  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
165
    """
166 167
      Reserve unreserved messages matching given line.
      Return their uids.
168
    """
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
    try:
      result = activity_tool.SQLDict_selectDuplicatedLineList(
        path=line.path,
        method_id=line.method_id,
        group_method_id=line.group_method_id,
        order_validation_text=line.order_validation_text
      )
      uid_list = [x.uid for x in result]
      if len(uid_list):
        activity_tool.SQLDict_reserveDuplicatedLineList(
          processing_node=processing_node,
          uid_list=uid_list
        )
      else:
        # Release locks
        activity_tool.SQLDict_commit()
    except:
      # Log
      LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
      # Release lock
      activity_tool.SQLDict_rollback()
      # And re-raise
      raise
    return uid_list
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217

  def getProcessableMessageList(self, activity_tool, processing_node):
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - set reserved message to processing=1 state
      - if this message has a group_method_id:
        - reserve a bunch of BUNDLE_MESSAGE_COUNT messages
        - untill number of impacted objects goes over MAX_GROUPED_OBJECTS
          - get one message from the reserved bunch (this messages will be
            "needed")
          - increase the number of impacted object
        - set "needed" reserved messages to processing=1 state
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
218
        4-tuple:
219
          - list of messages
220 221
          - impacted object count
          - group_method_id
222
          - uid_to_duplicate_uid_list_dict
223
    """
224
    def getReservedMessageList(limit, group_method_id=None):
225 226 227
      line_list = self.getReservedMessageList(activity_tool=activity_tool,
                                              date=now_date,
                                              processing_node=processing_node,
228 229
                                              limit=limit,
                                              group_method_id=group_method_id)
230 231 232
      if len(line_list):
        LOG('SQLDict', TRACE, 'Reserved messages: %r' % ([x.uid for x in line_list]))
      return line_list
233 234 235 236 237 238
    def getDuplicateMessageUidList(line):
      uid_list = self.getDuplicateMessageUidList(activity_tool=activity_tool, 
        line=line, processing_node=processing_node)
      if len(uid_list):
        LOG('SQLDict', TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
      return uid_list
239 240 241 242 243 244 245 246 247
    def makeMessageListAvailable(uid_list):
      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=uid_list)
    BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
    now_date = self.getNow(activity_tool)
    message_list = []
    count = 0
    group_method_id = None
    try:
      result = getReservedMessageList(limit=1)
248
      uid_to_duplicate_uid_list_dict = {}
249 250
      if len(result) > 0:
        line = result[0]
Julien Muchembled's avatar
Julien Muchembled committed
251
        uid = line.uid
252 253
        m = self.loadMessage(line.message, uid=uid, line=line)
        message_list.append(m)
254
        group_method_id = line.group_method_id
Julien Muchembled's avatar
Julien Muchembled committed
255 256 257
        activity_tool.SQLDict_processMessage(uid=[uid])
        uid_to_duplicate_uid_list_dict.setdefault(uid, []) \
          .extend(getDuplicateMessageUidList(line))
258
        if group_method_id not in (None, '', '\0'):
259
          # Count the number of objects to prevent too many objects.
260
          count += len(m.getObjectList(activity_tool))
261 262
          if count < MAX_GROUPED_OBJECTS:
            # Retrieve objects which have the same group method.
263
            result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT, group_method_id=group_method_id)
264
            path_and_method_id_dict = {}
265
            unreserve_uid_list = []
266
            for line in result:
267 268
              if line.uid == uid:
                continue
269 270 271 272
              # All fetched lines have the same group_method_id and
              # processing_node.
              # Their dates are lower-than or equal-to now_date.
              # We read each line once so lines have distinct uids.
273 274 275
              # So what remains to be filtered on are path, method_id and
              # order_validation_text.
              key = (line.path, line.method_id, line.order_validation_text)
276 277 278
              original_uid = path_and_method_id_dict.get(key)
              if original_uid is not None:
                uid_to_duplicate_uid_list_dict.setdefault(original_uid, []).append(line.uid)
279
                continue
280
              path_and_method_id_dict[key] = line.uid
281
              uid_to_duplicate_uid_list_dict.setdefault(line.uid, []).extend(getDuplicateMessageUidList(line))
282
              if count < MAX_GROUPED_OBJECTS:
283
                m = self.loadMessage(line.message, uid=line.uid, line=line)
284
                count += len(m.getObjectList(activity_tool))
285
                message_list.append(m)
286
              else:
287
                unreserve_uid_list.append(line.uid)
288
            activity_tool.SQLDict_processMessage(uid=[m.uid for m in message_list])
289 290
            # Unreserve extra messages as soon as possible.
            makeMessageListAvailable(unreserve_uid_list)
291
      return message_list, count, group_method_id, uid_to_duplicate_uid_list_dict
292 293 294
    except:
      LOG('SQLDict', WARNING, 'Exception while reserving messages.', error=sys.exc_info())
      if len(message_list):
295
        to_free_uid_list = [m.uid for m in message_list]
296 297 298
        try:
          makeMessageListAvailable(to_free_uid_list)
        except:
299
          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
300
        else:
301 302 303 304
          if len(to_free_uid_list):
            LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list, ))
      else:
        LOG('SQLDict', TRACE, '(no message was reserved)')
305
      return [], 0, None, {}
306 307 308

  # Queue semantic
  def dequeueMessage(self, activity_tool, processing_node):
309 310 311 312 313 314
    def makeMessageListAvailable(uid_list, uid_to_duplicate_uid_list_dict):
      final_uid_list = []
      for uid in uid_list:
        final_uid_list.append(uid)
        final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
      self.makeMessageListAvailable(activity_tool=activity_tool, uid_list=final_uid_list)
315
    message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
316
      self.getProcessableMessageList(activity_tool, processing_node)
317
    if message_list:
318 319 320 321 322 323
      # Remove group_id parameter from group_method_id
      if group_method_id is not None:
        group_method_id = group_method_id.split('\0')[0]
      if group_method_id not in (None, ""):
        method  = activity_tool.invokeGroup
        args = (group_method_id, message_list)
324
        activity_runtime_environment = ActivityRuntimeEnvironment(None)
325 326
      else:
        method = activity_tool.invoke
327 328
        message = message_list[0]
        args = (message, )
329
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
330
      # Commit right before executing messages.
Julien Muchembled's avatar
Julien Muchembled committed
331
      # As MySQL transaction does not start exactly at the same time as ZODB
332 333 334
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
Julien Muchembled's avatar
Julien Muchembled committed
335
      # So all connectors must be committed now that we have selected
336
      # everything needed from MySQL to get a fresh view of ZODB objects.
Julien Muchembled's avatar
Julien Muchembled committed
337
      get_transaction().commit()
338 339
      tv = getTransactionalVariable(None)
      tv['activity_runtime_environment'] = activity_runtime_environment
340
      # Try to invoke
341 342 343
      try:
        method(*args)
      except:
344
        LOG('SQLDict', WARNING, 'Exception raised when invoking messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
345
        try:
346
          abortTransactionSynchronously()
347 348
        except:
          # Unfortunately, database adapters may raise an exception against abort.
349
          LOG('SQLDict', PANIC,
350
              'abort failed, thus some objects may be modified accidentally')
351
          raise
352 353
        # XXX Is it still useful to free messages now that this node is able
        #     to reselect them ?
354
        to_free_uid_list = [x.uid for x in message_list]
355
        try:
356
          makeMessageListAvailable(to_free_uid_list, uid_to_duplicate_uid_list_dict)
357
        except:
358
          LOG('SQLDict', ERROR, 'Failed to free messages: %r' % (to_free_uid_list, ), error=sys.exc_info())
359 360
        else:
          LOG('SQLDict', TRACE, 'Freed messages %r' % (to_free_uid_list))
361
      # Abort if something failed.
362
      if [m for m in message_list if m.getExecutionState() == MESSAGE_NOT_EXECUTED]:
363 364 365
        endTransaction = abortTransactionSynchronously
      else:
        endTransaction = get_transaction().commit
Yoshinori Okuji's avatar
Yoshinori Okuji committed
366
      try:
367
        endTransaction()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
368
      except:
369
        LOG('SQLDict', WARNING, 'Failed to end transaction for messages (uid, path, method_id) %r' % ([(m.uid, m.object_path, m.method_id) for m in message_list], ), error=sys.exc_info())
370 371 372 373 374 375 376
        if endTransaction == abortTransactionSynchronously:
          LOG('SQLDict', PANIC, 'Failed to abort executed messages. Some objects may be modified accidentally.')
        else:
          try:
            abortTransactionSynchronously()
          except:
            LOG('SQLDict', PANIC, 'Failed to abort executed messages which also failed to commit. Some objects may be modified accidentally.')
377
            raise
378
        exc_info = sys.exc_info()
379 380
        for m in message_list:
          m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info=exc_info, log=False)
381
        try:
382
          makeMessageListAvailable(message_list, uid_to_duplicate_uid_list_dict)
383
        except:
384
          LOG('SQLDict', ERROR, 'Failed to free remaining messages: %r' % (message_list, ), error=sys.exc_info())
385
        else:
386 387
          LOG('SQLDict', TRACE, 'Freed messages %r' % (message_list, ))
      self.finalizeMessageExecution(activity_tool, message_list, uid_to_duplicate_uid_list_dict)
388
    get_transaction().commit()
389
    return not message_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
390

391
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
392 393
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
394 395 396 397
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
398 399 400
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
401 402
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
403
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
404 405
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
406 407 408 409 410 411

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
412 413

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
414 415
    """
    path = '/'.join(object_path)
416
    # LOG('Flush', 0, str((path, invoke, method_id)))
417
    method_dict = {}
418 419
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
420 421
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
422
        if m.object_path == object_path and (method_id is None or method_id == m.method_id):
423
          #if not method_dict.has_key(method_id or m.method_id):
424 425
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
426
            if invoke:
427 428
              # First Validate
              validate_value = m.validate(self, activity_tool)
429
              if validate_value is VALID:
430
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
431
                if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
432 433
                  # The message no longer exists
                  raise ActivityFlushError, (
434
                      'Could not evaluate %s on %s' % (m.method_id , path))
435
              elif validate_value is INVALID_PATH:
436 437
                # The message no longer exists
                raise ActivityFlushError, (
438
                    'The document %s does not exist' % path)
439 440 441 442
              else:
                raise ActivityFlushError, (
                    'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
443
      # Parse each message in SQL dict
444
      result = readMessageList(path=path, method_id=method_id,
445
                               processing_node=None,include_processing=0, to_date=None)
446 447
      for line in result:
        path = line.path
448 449
        line_method_id = line.method_id
        if not method_dict.has_key(line_method_id):
450
          # Only invoke once (it would be different for a queue)
451 452
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
453
          method_dict[line_method_id] = 1
454
          m = self.loadMessage(line.message, uid=line.uid, line=line)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
455
          if invoke:
456 457 458 459 460
            # First Validate (only if message is marked as new)
            if line.processing_node == -1:
              validate_value = m.validate(self, activity_tool)
            else:
              validate_value = VALID
Romain Courteaud's avatar
Romain Courteaud committed
461
#             LOG('SQLDict.flush validate_value',0,validate_value)
462
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
463
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
464
              if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
Jean-Paul Smets's avatar
Jean-Paul Smets committed
465 466
                # The message no longer exists
                raise ActivityFlushError, (
467
                    'Could not evaluate %s on %s' % (m.method_id , path))
468
            elif validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
469 470
              # The message no longer exists
              raise ActivityFlushError, (
471
                  'The document %s does not exist' % path)
472 473 474
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
475 476

      if len(result):
477
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
478
                                                     order_validation_text=None)
479
        if len(uid_list)>0:
480 481
          activity_tool.SQLBase_delMessage(table=self.sql_table,
                                           uid=[x.uid for x in uid_list])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
482

483
  getMessageList = SQLBase.getMessageList
484

485 486 487
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
488 489 490
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
491
      for line in result:
492
        m = self.loadMessage(line.message, uid=line.uid, line=line)
493 494 495
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
496
  def distribute(self, activity_tool, node_count):
497
    offset = 0
498 499
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
500 501
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
502 503 504 505 506 507
      while 1:
        result = readMessageList(path=None, method_id=None, processing_node=-1,
                                 to_date=now_date, include_processing=0,
                                 offset=offset, count=READ_MESSAGE_LIMIT)
        if not result:
          return
508 509 510 511 512
        get_transaction().commit()

        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
513 514
          message = self.loadMessage(line.message, uid=line.uid, line=line,
            order_validation_text=line.order_validation_text)
515 516
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
517 518

        if message_dict:
519 520 521
          message_unique_dict = {}
          serialization_tag_dict = {}
          distributable_uid_set = set()
522
          deletable_uid_list = []
523

524 525 526
          # remove duplicates
          # SQLDict considers object_path, method_id, tag to unify activities,
          # but ignores method arguments. They are outside of semantics.
527 528 529 530 531 532 533 534 535 536 537 538 539 540
          for message in message_dict.itervalues():
            message_unique_dict.setdefault(self.generateMessageUID(message),
                                           []).append(message)
          for message_list in message_unique_dict.itervalues():
            if len(message_list) > 1:
              # Sort list of duplicates to keep the message with highest score
              message_list.sort(key=sort_message_key)
              deletable_uid_list += [m.uid for m in message_list[1:]]
            message = message_list[0]
            distributable_uid_set.add(message.uid)
            serialization_tag = message.activity_kw.get('serialization_tag')
            if serialization_tag is not None:
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
541 542
          # Don't let through if there is the same serialization tag in the
          # message dict. If there is the same serialization tag, only one can
543
          # be validated and others must wait.
544 545
          # But messages with group_method_id are exceptions. serialization_tag
          # does not stop validating together. Because those messages should
546
          # be processed together at once.
547 548 549 550 551 552 553 554 555 556
          for message_list in serialization_tag_dict.itervalues():
            if len(message_list) == 1:
              continue
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
            group_method_id = message_list[0].activity_kw.get('group_method_id')
            for message in message_list[1:]:
              if group_method_id is None or \
                 group_method_id != message.activity_kw.get('group_method_id'):
                distributable_uid_set.remove(message.uid)
557
          if deletable_uid_list:
558 559
            activity_tool.SQLBase_delMessage(table=self.sql_table,
                                             uid=deletable_uid_list)
560 561 562 563 564
          distributable_count = len(distributable_uid_set)
          if distributable_count:
            activity_tool.SQLBase_assignMessage(table=self.sql_table,
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
Julien Muchembled's avatar
Julien Muchembled committed
565 566 567
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
        offset += READ_MESSAGE_LIMIT
Jean-Paul Smets's avatar
Jean-Paul Smets committed
568

569
  # Validation private methods
570 571
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
572 573 574 575 576 577 578
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

579
    if method_id or message_uid or path or tag or serialization_tag:
580 581 582 583
      validateMessageList = activity_tool.SQLDict_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
584
                                   tag=tag,
585
                                   count=False,
586
                                   serialization_tag=serialization_tag)
587 588 589
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
590
                             line=line,
591 592 593 594 595 596 597 598 599 600 601 602
                             uid=line.uid,
                             order_validation_text=line.order_validation_text,
                             date=line.date,
                             processing_node=line.processing_node)
        message_list.append(m)
      return message_list
    else:
      return []

  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
603
    """
604
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
605
      tag = [tag]
606
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
607
      path = [path]
Jérome Perrin's avatar
bug.  
Jérome Perrin committed
608
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
609 610 611 612
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
613
                                                       tag=tag,
614
                                                       serialization_tag=None,
615
                                                       count=1)
Sebastien Robin's avatar
Sebastien Robin committed
616 617
    return result[0].uid_count

618
  def countMessageWithTag(self, activity_tool, value):
619
    """Return the number of messages which match the given tag.
620
    """
621
    return self.countMessage(activity_tool, tag=value)
622

623
  # Required for tests (time shift)
624
  def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
625 626 627 628
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
629
    activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
630

631 632 633 634 635
  def getPriority(self, activity_tool):
    method = activity_tool.SQLDict_getPriority
    default =  RAMDict.getPriority(self, activity_tool)
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
636
registerActivity(SQLDict)