SQLDict.py 30.3 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
30
from Products.CMFActivity.ActivityTool import registerActivity
31 32
from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY, \
        abortTransactionSynchronously
Jean-Paul Smets's avatar
Jean-Paul Smets committed
33
from RAMDict import RAMDict
34
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
35
from Products.CMFActivity.Errors import ActivityFlushError
36
from ZODB.POSException import ConflictError
37
import sys
38
from types import ClassType
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44
try:
  from transaction import get as get_transaction
except ImportError:
  pass

45
from zLOG import LOG, TRACE, WARNING, ERROR, INFO
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46

47
MAX_PRIORITY = 5
48
MAX_GROUPED_OBJECTS = 500
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49

50 51 52 53 54 55
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1
56

57
LAST_PROCESSING_NODE = 1
58

Jean-Paul Smets's avatar
Jean-Paul Smets committed
59 60 61 62 63 64
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65
  # Transaction commit methods
66
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
67
    if m.is_registered:
68
      activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69 70
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
71
                                          broadcast = m.activity_kw.get('broadcast', 0),
72
                                          message = self.dumpMessage(m),
73
                                          date = m.activity_kw.get('at_date', DateTime()),
74 75
                                          group_method_id = '\0'.join([m.activity_kw.get('group_method_id', ''),
                                                                      m.activity_kw.get('group_id', '')]),
76 77
                                          tag = m.activity_kw.get('tag', ''),
                                          order_validation_text = self.getOrderValidationText(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
78
                                          # Also store uid of activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79

80 81 82 83 84 85
  def prepareQueueMessageList(self, activity_tool, message_list):
    registered_message_list = []
    for message in message_list:
      if message.is_registered:
        registered_message_list.append(message)
    if len(registered_message_list) > 0:
86
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
87 88 89 90 91
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      broadcast_list = [message.activity_kw.get('broadcast', 0) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
92 93
      datetime = DateTime()
      date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list]
94 95
      group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
                              for message in registered_message_list]
96
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
97
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
98 99 100
      uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=len(registered_message_list))
      activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                              path_list = path_list,
101 102 103 104
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              broadcast_list = broadcast_list,
                                              message_list = dumped_message_list,
105
                                              date_list = date_list,
106
                                              group_method_id_list = group_method_id_list,
107 108
                                              tag_list = tag_list,
                                              order_validation_text_list = order_validation_text_list)
109

110 111
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
112
    path = '/'.join(m.object_path)
113 114
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
115
                                                 order_validation_text = order_validation_text,
116
                                                 processing_node = None)
117
    uid_list = [x.uid for x in uid_list]
118
    if len(uid_list)>0:
119 120 121
      activity_tool.SQLDict_delMessage(uid = uid_list)

  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
122
  def registerActivityBuffer(self, activity_buffer):
123
    pass
124

Jean-Paul Smets's avatar
Jean-Paul Smets committed
125
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
126
    uid_set = activity_buffer.getUidSet(self)
127
    return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id')) in uid_set
128

Jean-Paul Smets's avatar
Jean-Paul Smets committed
129 130
  def registerMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 1
131
    uid_set = activity_buffer.getUidSet(self)
132
    uid_set.add((tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id')))
133 134
    message_list = activity_buffer.getMessageList(self)
    message_list.append(m)
135

Jean-Paul Smets's avatar
Jean-Paul Smets committed
136 137
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
138
    class_name = self.__class__.__name__
139
    uid_set = activity_buffer.getUidSet(self)
140
    uid_set.discard((tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id')))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
141 142

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
143 144
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
145
  
146
  def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
147
    validation_state = message.validate(self, activity_tool, check_order_validation=0)
148
    if validation_state is not VALID:
149 150 151 152 153 154 155 156
      # There is a serious validation error - we must lower priority
      if priority > MAX_PRIORITY:
        # This is an error
        if len(uid_list) > 0:
          activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
                                                                          # Assign message back to 'error' state
        #m.notifyUser(activity_tool)                                      # Notify Error
        get_transaction().commit()                                        # and commit
157
      else:
158 159 160 161
        # Lower priority
        if len(uid_list) > 0: # Add some delay before new processing
          activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
                                            priority=priority + 1, retry=1)
162 163 164
        get_transaction().commit() # Release locks before starting a potentially long calculation
      return 0
    return 1
165

Jean-Paul Smets's avatar
Jean-Paul Smets committed
166
  # Queue semantic
Jean-Paul Smets's avatar
Jean-Paul Smets committed
167
  def dequeueMessage(self, activity_tool, processing_node):
168 169 170 171 172
    readMessage = getattr(activity_tool, 'SQLDict_readMessage', None)
    if readMessage is None:
      return 1

    now_date = DateTime()
173 174
    result = readMessage(processing_node=processing_node, to_date=now_date)
    if len(result) > 0:
175 176 177
      line = result[0]
      path = line.path
      method_id = line.method_id
178
      group_method_id = line.group_method_id
179
      order_validation_text = line.order_validation_text
180 181
      uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
                                                   processing_node=None, to_date=now_date,
182 183
                                                   order_validation_text=order_validation_text,
                                                   group_method_id=group_method_id)
184 185 186 187 188 189
      uid_list = [x.uid for x in uid_list]
      uid_list_list = [uid_list]
      priority_list = [line.priority]
      # Make sure message can not be processed anylonger
      if len(uid_list) > 0:
        # Set selected messages to processing
190 191
        activity_tool.SQLDict_processMessage(uid=uid_list,
                                             processing_node=processing_node)
192 193
      get_transaction().commit() # Release locks before starting a potentially long calculation
      # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
194 195 196 197

      # At this point, messages are marked as processed. So catch any kind of exception to make sure
      # that they are unmarked on error.
      try:
198
        m = self.loadMessage(line.message, uid=line.uid)
199 200 201 202
        message_list = [m]
        # Validate message (make sure object exists, priority OK, etc.)
        if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
          return 0
203
        
204
        if group_method_id not in (None, '', '\0'):
205 206
          # Count the number of objects to prevent too many objects.
          if m.hasExpandMethod():
207
            count = len(m.getObjectList(activity_tool))
208
          else:
209
            count = 1
210
          
211 212
          if count < MAX_GROUPED_OBJECTS:
            # Retrieve objects which have the same group method.
213 214 215
            result = readMessage(processing_node=processing_node,
                                 to_date=now_date, group_method_id=group_method_id,
                                 order_validation_text=order_validation_text)
216
            #LOG('SQLDict dequeueMessage', 0, 'result = %d, group_method_id %s' % (len(result), group_method_id))
217
            path_and_method_id_dict = {}
218 219 220
            for line in result:
              path = line.path
              method_id = line.method_id
221 222 223 224 225 226 227

              # Prevent using the same pair of a path and a method id.
              key = (path, method_id)
              if key in path_and_method_id_dict:
                continue
              path_and_method_id_dict[key] = 1

228 229
              uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
                                                           processing_node=None,
230
                                                           to_date=now_date, group_method_id=group_method_id,
231
                                                           order_validation_text=order_validation_text)
232 233 234
              uid_list = [x.uid for x in uid_list]
              if len(uid_list) > 0:
                # Set selected messages to processing
235 236
                activity_tool.SQLDict_processMessage(uid=uid_list,
                                                     processing_node=processing_node)
237
              get_transaction().commit() # Release locks before starting a potentially long calculation
238 239 240 241

              # Save this newly marked uids as soon as possible.
              uid_list_list.append(uid_list)

242
              m = self.loadMessage(line.message, uid=line.uid)
243 244
              if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
                if m.hasExpandMethod():
245
                  count += len(m.getObjectList(activity_tool))
246 247 248 249 250 251
                else:
                  count += 1
                message_list.append(m)
                priority_list.append(line.priority)
                if count >= MAX_GROUPED_OBJECTS:
                  break
252 253 254 255
              else:
                # If the uids were not valid, remove them from the list, as validateMessage
                # unmarked them.
                uid_list_list.pop()
256

257 258
          # Release locks before starting a potentially long calculation
          get_transaction().commit()
259

260
        # Remove group_id parameter from group_method_id
261
        if group_method_id is not None:
262 263 264 265
          group_method_id = group_method_id.split('\0')[0]
        # Try to invoke
        if group_method_id not in (None, ""):
          LOG('SQLDict', INFO,
266
              'invoking a group method %s with %d objects '\
267
              ' (%d objects in expanded form)' % ( 
268
            group_method_id, len(message_list), count))
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
          activity_tool.invokeGroup(group_method_id, message_list)
        else:
          activity_tool.invoke(message_list[0])

        # Check if messages are executed successfully.
        # When some of them are executed successfully, it may not be acceptable to
        # abort the transaction, because these remain pending, only due to other
        # invalid messages. This means that a group method should not be used if
        # it has a side effect. For now, only indexing uses a group method, and this
        # has no side effect.
        for m in message_list:
          if m.is_executed:
            get_transaction().commit()
            break
        else:
284
          abortTransactionSynchronously()
285
      except:
286 287
        LOG('SQLDict', INFO, 
            'an exception happened during processing %r' % (uid_list_list,),
288
            error=sys.exc_info())
289 290
        # If an exception occurs, abort the transaction to minimize the impact,
        try:
291
          abortTransactionSynchronously()
292 293
        except:
          # Unfortunately, database adapters may raise an exception against abort.
294 295
          LOG('SQLDict', WARNING,
              'abort failed, thus some objects may be modified accidentally')
296
          pass
297 298 299

        # An exception happens at somewhere else but invoke or invokeGroup, so messages
        # themselves should not be delayed.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
300
        try:
301 302 303
          for uid_list in uid_list_list:
            if len(uid_list):
              # This only sets processing to zero.
304
              activity_tool.SQLDict_setPriority(uid=uid_list)
305
              get_transaction().commit()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
306
        except:
307 308
          LOG('SQLDict', ERROR,
              'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
Yoshinori Okuji's avatar
Yoshinori Okuji committed
309 310
              error=sys.exc_info())
          raise
311
        return 0
312
      
Yoshinori Okuji's avatar
Yoshinori Okuji committed
313 314 315 316 317 318
      try:
        for i in xrange(len(message_list)):
          m = message_list[i]
          uid_list = uid_list_list[i]
          priority = priority_list[i]
          if m.is_executed:
319
            if len(uid_list) > 0:
320 321
              activity_tool.SQLDict_delMessage(uid=uid_list)       # Delete it
            get_transaction().commit()                             # If successful, commit
Yoshinori Okuji's avatar
Yoshinori Okuji committed
322 323 324 325 326
            if m.active_process:
              active_process = activity_tool.unrestrictedTraverse(m.active_process)
              if not active_process.hasActivity():
                # No more activity
                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
327
          else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
328 329
            if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
              # If this is a conflict error, do not lower the priority but only delay.
330
              activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY)
331
              get_transaction().commit() # Release locks before starting a potentially long calculation
Yoshinori Okuji's avatar
Yoshinori Okuji committed
332 333 334
            elif priority > MAX_PRIORITY:
              # This is an error
              if len(uid_list) > 0:
335 336
                activity_tool.SQLDict_assignMessage(uid=uid_list,
                                                    processing_node=INVOKE_ERROR_STATE)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
337 338 339 340 341 342
                                                                                # Assign message back to 'error' state
              m.notifyUser(activity_tool)                                       # Notify Error
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              if len(uid_list) > 0:
343 344 345
                activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
                                                  priority=priority + 1)
              get_transaction().commit() # Release locks before starting a potentially long calculation
Yoshinori Okuji's avatar
Yoshinori Okuji committed
346
      except:
347 348
        LOG('SQLDict', ERROR,
            'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
Yoshinori Okuji's avatar
Yoshinori Okuji committed
349 350
            error=sys.exc_info())
        raise
351 352 353

      return 0
    get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
354 355
    return 1

356
  def hasActivity(self, activity_tool, object, **kw):
357 358
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
359 360
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
361
        result = hasMessage(path=my_object_path, **kw)
362 363 364 365
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
366 367
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
368
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
369 370
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
371 372 373 374 375 376

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
377 378

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
379 380
    """
    path = '/'.join(object_path)
381
    # LOG('Flush', 0, str((path, invoke, method_id)))
382
    method_dict = {}
383 384
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
385 386
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
387
        if m.object_path == object_path and (method_id is None or method_id == m.method_id):
388
          #if not method_dict.has_key(method_id or m.method_id):
389 390
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
391 392
            if invoke:
              # First Validate
393 394
              validate_value = m.validate(self, activity_tool)
              if validate_value is VALID:
395 396 397 398
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
                if not m.is_executed:                                                 # Make sure message could be invoked
                  # The message no longer exists
                  raise ActivityFlushError, (
399
                      'Could not evaluate %s on %s' % (m.method_id , path))
400
              elif validate_value is INVALID_PATH:
401 402
                # The message no longer exists
                raise ActivityFlushError, (
403
                    'The document %s does not exist' % path)
404 405 406 407
              else:
                raise ActivityFlushError, (
                    'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
408
      # Parse each message in SQL dict
409 410
      result = readMessageList(path=path, method_id=method_id,
                               processing_node=None,include_processing=0)
411 412
      for line in result:
        path = line.path
413 414
        line_method_id = line.method_id
        if not method_dict.has_key(line_method_id):
415
          # Only invoke once (it would be different for a queue)
416 417
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
418
          method_dict[line_method_id] = 1
419
          m = self.loadMessage(line.message, uid = line.uid)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
420 421
          if invoke:
            # First Validate
422
            validate_value = m.validate(self, activity_tool)
Romain Courteaud's avatar
Romain Courteaud committed
423
#             LOG('SQLDict.flush validate_value',0,validate_value)
424
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
425
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
Romain Courteaud's avatar
Romain Courteaud committed
426
#               LOG('SQLDict.flush m.is_executed',0,m.is_executed)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
427 428 429
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
430
                    'Could not evaluate %s on %s' % (m.method_id , path))
431
            elif validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
432 433
              # The message no longer exists
              raise ActivityFlushError, (
434
                  'The document %s does not exist' % path)
435 436 437
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
438 439

      if len(result):
440
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
441
                                                     processing_node = None,)
442 443
        if len(uid_list)>0:
          activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
444

445
  def getMessageList(self, activity_tool, processing_node=None, include_processing=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
446
    # YO: reading all lines might cause a deadlock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
447
    message_list = []
448 449 450
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
      result = readMessageList(path=None, method_id=None, processing_node=None,
451
                               to_processing_date=None, include_processing=include_processing)
452 453 454 455
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        m.processing_node = line.processing_node
        m.priority = line.priority
456
        m.processing = line.processing
457
        message_list.append(m)
458 459
    return message_list

460 461 462
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
463 464 465
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
466 467 468 469 470
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
471
  def distribute(self, activity_tool, node_count):
472 473
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
474
      global LAST_PROCESSING_NODE
475
      now_date = DateTime()
476 477 478 479 480 481
      result = readMessageList(path=None, method_id=None, processing_node=-1,
                               to_date=now_date, include_processing=0)
      get_transaction().commit()

      validation_text_dict = {'none': 1}
      message_dict = {}
482
      for line in result:
483 484 485 486 487 488 489 490
        message = self.loadMessage(line.message, uid = line.uid,
                                   order_validation_text = line.order_validation_text)
        self.getExecutableMessageList(activity_tool, message, message_dict,
                                      validation_text_dict)

      # XXX probably this below can be optimized by assigning multiple messages at a time.
      path_dict = {}
      assignMessage = activity_tool.SQLDict_assignMessage
491
      processing_node = LAST_PROCESSING_NODE
492
      id_tool = activity_tool.getPortalObject().portal_ids
493 494 495
      for message in message_dict.itervalues():
        path = '/'.join(message.object_path)
        broadcast = message.activity_kw.get('broadcast', 0)
496 497
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
498 499
          uid = message.uid
          assignMessage(processing_node=1, uid=[uid])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
500
          if node_count > 1:
501 502 503 504 505 506 507 508 509
            uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
                                                       id_count=node_count - 1)
            path_list = [path] * (node_count - 1)
            method_id_list = [message.method_id] * (node_count - 1)
            priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
            processing_node_list = range(2, node_count + 1)
            broadcast_list = [1] * (node_count - 1)
            message_list = [self.dumpMessage(message)] * (node_count - 1)
            date_list = [message.activity_kw.get('at_date', now_date)] * (node_count - 1)
510 511
            group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''),
                                              message.activity_kw.get('group_id', '')])] * (node_count - 1)
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
            tag_list = [message.activity_kw.get('tag', '')] * (node_count - 1)
            order_validation_text_list = [message.order_validation_text] * (node_count - 1)
            activity_tool.SQLDict_writeMessageList(uid_list=uid_list,
                                                   path_list=path_list,
                                                   method_id_list=method_id_list,
                                                   priority_list=priority_list,
                                                   broadcast_list=broadcast_list,
                                                   processing_node_list=processing_node_list,
                                                   message_list=message_list,
                                                   date_list=date_list,
                                                   group_method_id_list=group_method_id_list,
                                                   tag_list=tag_list,
                                                   order_validation_text_list=order_validation_text_list)
          get_transaction().commit()
        else:
          # Select a processing node. If the same path appears again, dispatch the message to
          # the same node, so that object caching is more efficient. Otherwise, apply a round
          # robin scheduling.
          node = path_dict.get(path)
          if node is None:
            node = processing_node
            path_dict[path] = node
            processing_node += 1
            if processing_node > node_count:
              processing_node = 1

          assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
539
          get_transaction().commit() # Release locks immediately to allow processing of messages
540
      LAST_PROCESSING_NODE = processing_node
Jean-Paul Smets's avatar
Jean-Paul Smets committed
541

542
  # Validation private methods
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

    if method_id or message_uid or path or tag:
      validateMessageList = activity_tool.SQLDict_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
                                   tag=tag)
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
                             uid=line.uid,
                             order_validation_text=line.order_validation_text,
                             date=line.date,
                             processing_node=line.processing_node)
        message_list.append(m)
      return message_list
    else:
      return []

569
  def _validate_after_method_id(self, activity_tool, message, value):
570
    return self._validate(activity_tool, method_id=value)
571

572
  def _validate_after_path(self, activity_tool, message, value):
573
    return self._validate(activity_tool, path=value)
574

575
  def _validate_after_message_uid(self, activity_tool, message, value):
576
    return self._validate(activity_tool, message_uid=value)
577

578
  def _validate_after_path_and_method_id(self, activity_tool, message, value):
579 580 581 582 583
    if not isinstance(value, (tuple, list)) or len(value) < 2:
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_path_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, path=value[0], method_id=value[1])
584

585
  def _validate_after_tag(self, activity_tool, message, value):
586
    return self._validate(activity_tool, tag=value)
587

588 589 590 591 592 593 594 595 596 597 598
  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
    if not isinstance(value, (tuple, list)) or len(value) < 2:
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_tag_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, tag=value[0], method_id=value[1])

  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
599
    """
600
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
601
      tag = [tag]
602
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
603
      path = [path]
Jérome Perrin's avatar
bug.  
Jérome Perrin committed
604
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
605 606 607 608
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
609 610
                                                       tag=tag,
                                                       count=1)
Sebastien Robin's avatar
Sebastien Robin committed
611 612
    return result[0].uid_count

613
  def countMessageWithTag(self, activity_tool, value):
614
    """Return the number of messages which match the given tag.
615
    """
616
    return self.countMessage(activity_tool, tag=value)
617

618
  # Required for tests (time shift)
619
  def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
620 621 622 623
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
624
    activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
625

Jean-Paul Smets's avatar
Jean-Paul Smets committed
626
registerActivity(SQLDict)