SQLBase.py 29.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
#                    Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import sys
30
import transaction
31 32
from _mysql_exceptions import ProgrammingError
from MySQLdb.constants.ER import NO_SUCH_TABLE
33 34
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
Julien Muchembled's avatar
Julien Muchembled committed
35
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
36
from ZODB.POSException import ConflictError
37
from Products.CMFActivity.ActivityTool import (
38
  Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
39
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE
40
from Products.CMFActivity.ActivityRuntimeEnvironment import (
41
  DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable)
42 43
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError
44

45 46 47
# TODO: Limit by size in bytes instead of number of rows.
MAX_MESSAGE_LIST_SIZE = 100

48
def sort_message_key(message):
49
  # same sort key as in SQLBase.getMessageList
50 51
  return message.line.priority, message.line.date, message.uid

52
_DequeueMessageException = Exception()
53

54 55 56 57 58 59 60 61 62 63 64 65 66 67
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
  sqltest_dict = {}
  no_quote_type = int, float, long
  def _(name, column=None, op="="):
    if column is None:
      column = name
    column_op = "%s %s " % (column, op)
    def render(value, render_string):
      if isinstance(value, no_quote_type):
        return column_op + str(value)
      if isinstance(value, DateTime):
        value = value.toZone('UTC').ISO()
68 69 70 71 72 73 74 75 76 77 78 79
      if isinstance(value, basestring):
        return column_op + render_string(value)
      assert op == "=", value
      if value is None: # XXX: see comment in SQLBase._getMessageList
        return column + " IS NULL"
      for x in value:
        if isinstance(x, no_quote_type):
          render_string = str
        elif isinstance(x, DateTime):
          value = (x.toZone('UTC').ISO() for x in value)
        return "%s IN (%s)" % (column, ', '.join(map(render_string, value)))
      return "0"
80 81 82 83 84 85 86 87 88 89 90 91 92 93
    sqltest_dict[name] = render
  _('active_process_uid')
  _('group_method_id')
  _('method_id')
  _('path')
  _('processing')
  _('processing_node')
  _('serialization_tag')
  _('tag')
  _('to_date', column="date", op="<=")
  _('uid')
  return sqltest_dict
sqltest_dict = sqltest_dict()

94
class SQLBase(Queue):
95 96 97
  """
    Define a set of common methods for SQL-based storage of activities.
  """
98 99 100
  def initialize(self, activity_tool, clear):
    folder = activity_tool.getPortalObject().portal_skins.activity
    try:
101
      createMessageTable = folder.SQLBase_createMessageTable
102 103 104 105 106 107 108 109
    except AttributeError:
      return
    if clear:
      folder.SQLBase_dropMessageTable(table=self.sql_table)
    else:
      column_list = []
      try:
        src = createMessageTable._upgradeSchema(added_list=column_list,
110 111
                                                modified_list=column_list,
                                                table=self.sql_table)
112 113 114 115 116 117 118 119 120 121 122 123
      except ProgrammingError, e:
        if e[0] != NO_SUCH_TABLE:
          raise
      else:
        if column_list and self._getMessageList(activity_tool, count=1):
          LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
              " The following added columns could not be initialized: %s\n%s"
              % (self.sql_table, ", ".join(column_list), src))
        elif src:
          LOG('CMFActivity', INFO, "%r table upgraded\n%s"
              % (self.sql_table, src))
        return
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
    createMessageTable(table=self.sql_table)

  def prepareQueueMessageList(self, activity_tool, message_list):
    registered_message_list = [m for m in message_list if m.is_registered]
    portal = activity_tool.getPortalObject()
    for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
      message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
      uid_list = portal.portal_ids.generateNewIdList(self.uid_group,
        id_count=len(message_list), id_generator='uid')
      path_list = ['/'.join(m.object_path) for m in message_list]
      active_process_uid_list = [m.active_process_uid for m in message_list]
      method_id_list = [m.method_id for m in message_list]
      priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
      date_list = [m.activity_kw.get('at_date') for m in message_list]
      group_method_id_list = [m.getGroupId() for m in message_list]
      tag_list = [m.activity_kw.get('tag', '') for m in message_list]
      serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
                                for m in message_list]
      processing_node_list = []
      for m in message_list:
        m.order_validation_text = x = self.getOrderValidationText(m)
        processing_node_list.append(0 if x == 'none' else -1)
      portal.SQLBase_writeMessageList(
        table=self.sql_table,
        uid_list=uid_list,
        path_list=path_list,
        active_process_uid_list=active_process_uid_list,
        method_id_list=method_id_list,
        priority_list=priority_list,
        message_list=map(Message.dump, message_list),
        group_method_id_list=group_method_id_list,
        date_list=date_list,
        tag_list=tag_list,
        processing_node_list=processing_node_list,
        serialization_tag_list=serialization_tag_list)
159

160 161 162 163 164 165 166 167 168 169
  def getNow(self, context):
    """
      Return the current value for SQL server's NOW().
      Note that this value is not cached, and is not transactionnal on MySQL
      side.
    """
    result = context.SQLBase_getNow()
    assert len(result) == 1
    assert len(result[0]) == 1
    return result[0][0]
170

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  def _getMessageList(self, activity_tool, offset=0, count=1000, src__=0, **kw):
    # XXX: Because most columns have NOT NULL constraint, conditions with None
    #      value should be ignored, instead of trying to render them
    #      (with comparisons with NULL).
    sql_connection = activity_tool.getPortalObject().cmf_activity_sql_connection
    q = sql_connection.sql_quote__
    if offset:
      limit = '\nLIMIT %d,%d' % (offset, sys.maxint if count is None else count)
    else:
      limit = '' if count is None else '\nLIMIT %d' % count
    sql = '\n  AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
    sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
      self.sql_table, sql and '\nWHERE ' + sql, limit)
    return sql if src__ else Results(sql_connection().query(sql, max_rows=0))

  def getMessageList(self, *args, **kw):
    result = self._getMessageList(*args, **kw)
    if type(result) is str: # src__ == 1
      return result,
190
    class_name = self.__class__.__name__
191
    return [Message.load(line.message,
192
                             activity=class_name,
193 194
                             uid=line.uid,
                             processing_node=line.processing_node,
195
                             retry=line.retry,
196
                             processing=line.processing)
197
      for line in result]
198

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
    """
    if isinstance(tag, str):
      tag = [tag]
    if isinstance(path, str):
      path = [path]
    if isinstance(method_id, str):
      method_id = [method_id]
    result = activity_tool.SQLBase_validateMessageList(table=self.sql_table,
                                                       method_id=method_id,
                                                       path=path,
                                                       message_uid=message_uid,
                                                       tag=tag,
                                                       serialization_tag=None,
                                                       count=1)
    return result[0].uid_count

  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None,
                  active_process_uid=None):
    hasMessage = getattr(activity_tool, 'SQLBase_hasMessage', None)
    if hasMessage is not None:
      if object is None:
        path = None
      else:
        path = '/'.join(object.getPhysicalPath())
      result = hasMessage(table=self.sql_table, path=path, method_id=method_id,
        only_valid=only_valid, active_process_uid=active_process_uid)
      if result:
        return result[0].message_count > 0
    return 0

  def getPriority(self, activity_tool):
    result = activity_tool.SQLBase_getPriority(table=self.sql_table)
    if result:
      assert len(result) == 1, len(result)
      return result[0]['priority']
    return Queue.getPriority(self, activity_tool)
238

239
  def _retryOnLockError(self, method, args=(), kw={}):
240 241
    while True:
      try:
242
        return method(*args, **kw)
243 244 245 246
      except ConflictError:
        # Note that this code assumes that a database adapter translates
        # a lock error into a conflict error.
        LOG('SQLBase', INFO, 'Got a lock error, retrying...')
247

248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
  # Validation private methods
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

    if method_id or message_uid or path or tag or serialization_tag:
      result = activity_tool.SQLBase_validateMessageList(table=self.sql_table,
          method_id=method_id,
          message_uid=message_uid,
          path=path,
          tag=tag,
          count=False,
          serialization_tag=serialization_tag)
      message_list = []
      for line in result:
        m = Message.load(line.message,
                         line=line,
                         uid=line.uid,
                         date=line.date,
                         processing_node=line.processing_node)
        if not hasattr(m, 'order_validation_text'): # BBB
          m.order_validation_text = self.getOrderValidationText(m)
        message_list.append(m)
      return message_list

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
  def _validate_after_method_id(self, activity_tool, message, value):
    return self._validate(activity_tool, method_id=value)

  def _validate_after_path(self, activity_tool, message, value):
    return self._validate(activity_tool, path=value)

  def _validate_after_message_uid(self, activity_tool, message, value):
    return self._validate(activity_tool, message_uid=value)

  def _validate_after_path_and_method_id(self, activity_tool, message, value):
    if not (isinstance(value, (tuple, list)) and len(value) == 2):
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_path_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, path=value[0], method_id=value[1])

  def _validate_after_tag(self, activity_tool, message, value):
    return self._validate(activity_tool, tag=value)

  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
    if not (isinstance(value, (tuple, list)) and len(value) == 2):
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_tag_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, tag=value[0], method_id=value[1])

  def _validate_serialization_tag(self, activity_tool, message, value):
    return self._validate(activity_tool, serialization_tag=value)
307 308 309 310 311

  def _log(self, severity, summary):
    LOG(self.__class__.__name__, severity, summary,
        error=severity>INFO and sys.exc_info() or None)

312 313
  def getReservedMessageList(self, activity_tool, date, processing_node,
                             limit=None, group_method_id=None):
314 315 316 317 318 319 320 321 322 323 324
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
        This number is not garanted to be reached, because of:
         - not enough messages being pending execution
         - race condition (other nodes reserving the same messages at the same
           time)
        This number is guaranted not to be exceeded.
        If None (or not given) no limit apply.
    """
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
    assert limit
    # Do not check already-assigned messages when trying to reserve more
    # activities, because in such case we will find one reserved activity.
    result = activity_tool.SQLBase_selectReservedMessageList(
      table=self.sql_table,
      count=limit,
      processing_node=processing_node,
      group_method_id=group_method_id,
    )
    limit -= len(result)
    if limit:
      reservable = activity_tool.SQLBase_getReservableMessageList(
        table=self.sql_table,
        count=limit,
        processing_node=processing_node,
        to_date=date,
        group_method_id=group_method_id,
      )
      if reservable:
        activity_tool.SQLBase_reserveMessageList(
          uid=[x.uid for x in reservable],
          table=self.sql_table,
          processing_node=processing_node,
        )
        # DC.ZRDB.Results.Results does not implement concatenation
        # Implement an imperfect (but cheap) concatenation. Do not update
        # __items__ nor _data_dictionary.
        assert result._names == reservable._names, (result._names,
          reservable._names)
        result._data += reservable._data
355 356 357 358 359 360 361
    return result

  def makeMessageListAvailable(self, activity_tool, uid_list):
    """
      Put messages back in processing_node=0 .
    """
    if len(uid_list):
362 363
      activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
                                                     uid=uid_list)
364

365 366 367 368
  def getProcessableMessageLoader(self, activity_tool, processing_node):
    # do not merge anything
    def load(line):
      uid = line.uid
369
      m = Message.load(line.message, uid=uid, line=line)
370 371 372
      return m, uid, ()
    return load

373 374 375 376 377 378 379 380 381
  def getProcessableMessageList(self, activity_tool, processing_node):
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - set reserved message to processing=1 state
      - if this message has a group_method_id:
382 383
        - reserve a bunch of messages
        - until the total "cost" of the group goes over 1
384 385
          - get one message from the reserved bunch (this messages will be
            "needed")
386
          - update the total cost
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
        - set "needed" reserved messages to processing=1 state
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
        4-tuple:
          - list of messages
          - group_method_id
          - uid_to_duplicate_uid_list_dict
    """
    def getReservedMessageList(limit, group_method_id=None):
      line_list = self.getReservedMessageList(activity_tool=activity_tool,
                                              date=now_date,
                                              processing_node=processing_node,
                                              limit=limit,
                                              group_method_id=group_method_id)
408
      if line_list:
409
        self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
410 411
      return line_list
    now_date = self.getNow(activity_tool)
412
    uid_to_duplicate_uid_list_dict = {}
413
    try:
414 415
      result = getReservedMessageList(1)
      if result:
416 417
        load = self.getProcessableMessageLoader(activity_tool, processing_node)
        m, uid, uid_list = load(result[0])
418
        message_list = [m]
419 420
        uid_to_duplicate_uid_list_dict[uid] = uid_list
        group_method_id = m.line.group_method_id
421
        if group_method_id != '\0':
422
          # Count the number of objects to prevent too many objects.
423 424
          cost = m.activity_kw.get('group_method_cost', .01)
          assert 0 < cost <= 1, (self.sql_table, uid)
425
          count = m.getObjectCount(activity_tool)
426 427 428 429 430
          # this is heuristic (messages with same group_method_id
          # are likely to have the same group_method_cost)
          limit = int(1. / cost + 1 - count)
          if limit > 1: # <=> cost * count < 1
            cost *= count
431
            # Retrieve objects which have the same group method.
432
            result = iter(getReservedMessageList(limit, group_method_id))
433
            for line in result:
434 435 436 437 438
              if line.uid in uid_to_duplicate_uid_list_dict:
                continue
              m, uid, uid_list = load(line)
              if m is None:
                uid_to_duplicate_uid_list_dict[uid] += uid_list
439
                continue
440
              uid_to_duplicate_uid_list_dict[uid] = uid_list
441
              cost += m.getObjectCount(activity_tool) * \
442 443 444 445 446 447 448 449
                      m.activity_kw.get('group_method_cost', .01)
              message_list.append(m)
              if cost >= 1:
                # Unreserve extra messages as soon as possible.
                self.makeMessageListAvailable(activity_tool=activity_tool,
                  uid_list=[line.uid for line in result if line.uid != uid])
        activity_tool.SQLBase_processMessage(table=self.sql_table,
          uid=uid_to_duplicate_uid_list_dict.keys())
450
        return message_list, group_method_id, uid_to_duplicate_uid_list_dict
451
    except:
452
      self._log(WARNING, 'Exception while reserving messages.')
453 454 455 456
      if uid_to_duplicate_uid_list_dict:
        to_free_uid_list = uid_to_duplicate_uid_list_dict.keys()
        for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
          to_free_uid_list += uid_list
457
        try:
458 459
          self.makeMessageListAvailable(activity_tool=activity_tool,
                                        uid_list=to_free_uid_list)
460
        except:
461
          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
462
        else:
463
          if to_free_uid_list:
464
            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
465
      else:
466
        self._log(TRACE, '(no message was reserved)')
467
    return [], None, uid_to_duplicate_uid_list_dict
468

469 470 471 472 473 474 475 476 477
  def _abort(self):
    try:
      transaction.abort()
    except:
      # Unfortunately, database adapters may raise an exception against abort.
      self._log(PANIC,
          'abort failed, thus some objects may be modified accidentally')
      raise

478 479
  # Queue semantic
  def dequeueMessage(self, activity_tool, processing_node):
480
    message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
481 482 483 484 485 486 487
      self.getProcessableMessageList(activity_tool, processing_node)
    if message_list:
      # Remove group_id parameter from group_method_id
      if group_method_id is not None:
        group_method_id = group_method_id.split('\0')[0]
      if group_method_id not in (None, ""):
        method  = activity_tool.invokeGroup
488
        args = (group_method_id, message_list, self.__class__.__name__,
489
                hasattr(self, 'generateMessageUID'))
490 491 492 493 494 495 496 497 498 499 500 501 502 503
        activity_runtime_environment = ActivityRuntimeEnvironment(None)
      else:
        method = activity_tool.invoke
        message = message_list[0]
        args = (message, )
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
      # Commit right before executing messages.
      # As MySQL transaction does not start exactly at the same time as ZODB
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
      # So all connectors must be committed now that we have selected
      # everything needed from MySQL to get a fresh view of ZODB objects.
      transaction.commit()
504
      transaction.begin()
505
      tv = getTransactionalVariable()
506 507 508 509
      tv['activity_runtime_environment'] = activity_runtime_environment
      # Try to invoke
      try:
        method(*args)
510 511 512 513 514 515 516
        # Abort if at least 1 message failed. On next tic, only those that
        # succeeded will be selected because their at_date won't have been
        # increased.
        for m in message_list:
          if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
            raise _DequeueMessageException
        transaction.commit()
517
      except:
518 519 520 521 522 523 524 525 526 527
        exc_info = sys.exc_info()
        if exc_info[1] is not _DequeueMessageException:
          self._log(WARNING,
            'Exception raised when invoking messages (uid, path, method_id) %r'
            % [(m.uid, m.object_path, m.method_id) for m in message_list])
          for m in message_list:
            m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
        self._abort()
        exc_info = message_list[0].exc_info
        if exc_info:
528
          try:
529 530 531 532 533 534 535
            # Register it again.
            tv['activity_runtime_environment'] = activity_runtime_environment
            cancel = message.on_error_callback(*exc_info)
            del exc_info, message.exc_info
            transaction.commit()
            if cancel:
              message.setExecutionState(MESSAGE_EXECUTED)
536
          except:
537 538 539
            self._log(WARNING, 'Exception raised when processing error callbacks')
            message.setExecutionState(MESSAGE_NOT_EXECUTED)
            self._abort()
540 541
      self.finalizeMessageExecution(activity_tool, message_list,
                                    uid_to_duplicate_uid_list_dict)
542 543 544
    transaction.commit()
    return not message_list

545 546 547 548 549 550 551
  def finalizeMessageExecution(self, activity_tool, message_list,
                               uid_to_duplicate_uid_list_dict=None):
    """
      If everything was fine, delete all messages.
      If anything failed, make successful messages available (if any), and
      the following rules apply to failed messages:
        - Failures due to ConflictErrors cause messages to be postponed,
552 553
          but their retry count is *not* increased.
        - Failures of messages already above maximum retry count cause them to
554
          be put in a permanent-error state.
555
        - In all other cases, retry count is increased and message is delayed.
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
    """
    deletable_uid_list = []
    delay_uid_list = []
    final_error_uid_list = []
    make_available_uid_list = []
    notify_user_list = []
    executed_uid_list = deletable_uid_list
    if uid_to_duplicate_uid_list_dict is not None:
      for m in message_list:
        if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
          executed_uid_list = make_available_uid_list
          break
    for m in message_list:
      uid = m.uid
      if m.getExecutionState() == MESSAGE_EXECUTED:
        executed_uid_list.append(uid)
        if uid_to_duplicate_uid_list_dict is not None:
          executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
        # Should duplicate messages follow strictly the original message, or
        # should they be just made available again ?
        if uid_to_duplicate_uid_list_dict is not None:
          make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
579 580
        if (m.exc_type and # m.exc_type may be None
            m.conflict_retry and issubclass(m.exc_type, ConflictError)):
581 582
          delay_uid_list.append(uid)
        else:
583
          max_retry = m.max_retry
584
          retry = m.line.retry
585 586
          if max_retry is not None and retry >= max_retry:
            # Always notify when we stop retrying.
587
            notify_user_list.append((m, False))
588 589
            final_error_uid_list.append(uid)
            continue
590 591
          # In case of infinite retry, notify the user
          # when the default limit is reached.
592
          if max_retry is None and retry == DEFAULT_MAX_RETRY:
593
            notify_user_list.append((m, True))
594 595 596 597
          delay = m.delay
          if delay is None:
            # By default, make delay quadratic to the number of retries.
            delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2
598 599 600 601
          try:
            # Immediately update, because values different for every message
            activity_tool.SQLBase_reactivate(table=self.sql_table,
                                             uid=[uid],
602 603
                                             delay=delay,
                                             retry=1)
604
          except:
605 606
            self._log(WARNING, 'Failed to reactivate %r' % uid)
        make_available_uid_list.append(uid)
607 608 609 610 611 612 613 614
      else: # MESSAGE_NOT_EXECUTABLE
        # 'path' does not point to any object. Activities are normally flushed
        # (without invoking them) when an object is deleted, but this is only
        # an optimisation. There is no efficient and reliable way to do such
        # this, because a concurrent and very long transaction may be about to
        # activate this object, without conflict.
        # So we have to clean up any remaining activity.
        deletable_uid_list.append(uid)
615 616 617 618 619 620 621 622 623 624 625
    if deletable_uid_list:
      try:
        self._retryOnLockError(activity_tool.SQLBase_delMessage,
                               kw={'table': self.sql_table,
                                   'uid': deletable_uid_list})
      except:
        self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
      else:
        self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
    if delay_uid_list:
      try:
626
        # If this is a conflict error, do not increase 'retry' but only delay.
627
        activity_tool.SQLBase_reactivate(table=self.sql_table,
628
          uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, retry=None)
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
      except:
        self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
    if final_error_uid_list:
      try:
        activity_tool.SQLBase_assignMessage(table=self.sql_table,
          uid=final_error_uid_list, processing_node=INVOKE_ERROR_STATE)
      except:
        self._log(ERROR, 'Failed to set message to error state for %r'
                         % final_error_uid_list)
    if make_available_uid_list:
      try:
        self.makeMessageListAvailable(activity_tool=activity_tool,
                                      uid_list=make_available_uid_list)
      except:
        self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
      else:
        self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
    try:
647 648
      for m, retry in notify_user_list:
        m.notifyUser(activity_tool, retry)
649 650 651 652
    except:
      # Notification failures must not cause this method to raise.
      self._log(WARNING,
        'Exception during notification phase of finalizeMessageExecution')
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693

  def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
    """
      object_path is a tuple
    """
    path = '/'.join(object_path)
    if invoke:
      invoked = set()
      def invoke(message):
        try:
          key = self.generateMessageUID(message)
          if key in invoked:
            return
          invoked.add(key)
        except AttributeError:
          pass
        line = getattr(message, 'line', None)
        validate_value = VALID if line and line.processing_node != -1 else \
                         message.validate(self, activity_tool)
        if validate_value == VALID:
          # Try to invoke the message - what happens if invoke calls flushActivity ??
          activity_tool.invoke(message)
          if message.getExecutionState() != MESSAGE_EXECUTED:
            raise ActivityFlushError('Could not invoke %s on %s'
                                     % (message.method_id, path))
        elif validate_value is INVALID_PATH:
          raise ActivityFlushError('The document %s does not exist' % path)
        else:
          raise ActivityFlushError('Could not validate %s on %s'
                                   % (message.method_id, path))
    for m in activity_tool.getRegisteredMessageList(self):
      if object_path == m.object_path and (
         method_id is None or method_id == m.method_id):
        if invoke:
          invoke(m)
        activity_tool.unregisterMessage(self, m)
    uid_list = []
    for line in self._getMessageList(activity_tool, path=path, processing=0,
        **({'method_id': method_id} if method_id else {})):
      uid_list.append(line.uid)
      if invoke:
694
        invoke(Message.load(line.message, uid=line.uid, line=line))
695 696
    if uid_list:
      activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
697 698 699 700 701 702 703 704 705

  # Required for tests
  def timeShift(self, activity_tool, delay, processing_node=None):
    """
      To simulate time shift, we simply substract delay from
      all dates in message(_queue) table
    """
    activity_tool.SQLBase_timeShift(table=self.sql_table, delay=delay,
                                    processing_node=processing_node)