SQLBase.py 27.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
#                    Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import sys
30
import transaction
31 32
from _mysql_exceptions import ProgrammingError
from MySQLdb.constants.ER import NO_SUCH_TABLE
33 34
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
Julien Muchembled's avatar
Julien Muchembled committed
35
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
36
from ZODB.POSException import ConflictError
37
from Products.CMFActivity.ActivityTool import (
38
  Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED)
39
from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE
40
from Products.CMFActivity.ActivityRuntimeEnvironment import (
41
  DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment, getTransactionalVariable)
42 43
from Queue import Queue, VALIDATION_ERROR_DELAY, VALID, INVALID_PATH
from Products.CMFActivity.Errors import ActivityFlushError
44

45
def sort_message_key(message):
46
  # same sort key as in SQLBase.getMessageList
47 48
  return message.line.priority, message.line.date, message.uid

49
_DequeueMessageException = Exception()
50

51 52 53 54 55 56 57 58 59 60 61 62 63 64
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
  sqltest_dict = {}
  no_quote_type = int, float, long
  def _(name, column=None, op="="):
    if column is None:
      column = name
    column_op = "%s %s " % (column, op)
    def render(value, render_string):
      if isinstance(value, no_quote_type):
        return column_op + str(value)
      if isinstance(value, DateTime):
        value = value.toZone('UTC').ISO()
65 66 67 68 69 70 71 72 73 74 75 76
      if isinstance(value, basestring):
        return column_op + render_string(value)
      assert op == "=", value
      if value is None: # XXX: see comment in SQLBase._getMessageList
        return column + " IS NULL"
      for x in value:
        if isinstance(x, no_quote_type):
          render_string = str
        elif isinstance(x, DateTime):
          value = (x.toZone('UTC').ISO() for x in value)
        return "%s IN (%s)" % (column, ', '.join(map(render_string, value)))
      return "0"
77 78 79 80 81 82 83 84 85 86 87 88 89 90
    sqltest_dict[name] = render
  _('active_process_uid')
  _('group_method_id')
  _('method_id')
  _('path')
  _('processing')
  _('processing_node')
  _('serialization_tag')
  _('tag')
  _('to_date', column="date", op="<=")
  _('uid')
  return sqltest_dict
sqltest_dict = sqltest_dict()

91
class SQLBase(Queue):
92 93 94
  """
    Define a set of common methods for SQL-based storage of activities.
  """
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
  def initialize(self, activity_tool, clear):
    folder = activity_tool.getPortalObject().portal_skins.activity
    try:
      createMessageTable = getattr(folder,
        self.__class__.__name__ + '_createMessageTable')
    except AttributeError:
      return
    if clear:
      folder.SQLBase_dropMessageTable(table=self.sql_table)
    else:
      column_list = []
      try:
        src = createMessageTable._upgradeSchema(added_list=column_list,
                                                modified_list=column_list)
      except ProgrammingError, e:
        if e[0] != NO_SUCH_TABLE:
          raise
      else:
        if column_list and self._getMessageList(activity_tool, count=1):
          LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
              " The following added columns could not be initialized: %s\n%s"
              % (self.sql_table, ", ".join(column_list), src))
        elif src:
          LOG('CMFActivity', INFO, "%r table upgraded\n%s"
              % (self.sql_table, src))
        return
    createMessageTable()
122

123 124 125 126 127 128 129 130 131 132
  def getNow(self, context):
    """
      Return the current value for SQL server's NOW().
      Note that this value is not cached, and is not transactionnal on MySQL
      side.
    """
    result = context.SQLBase_getNow()
    assert len(result) == 1
    assert len(result[0]) == 1
    return result[0][0]
133

134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
  def _getMessageList(self, activity_tool, offset=0, count=1000, src__=0, **kw):
    # XXX: Because most columns have NOT NULL constraint, conditions with None
    #      value should be ignored, instead of trying to render them
    #      (with comparisons with NULL).
    sql_connection = activity_tool.getPortalObject().cmf_activity_sql_connection
    q = sql_connection.sql_quote__
    if offset:
      limit = '\nLIMIT %d,%d' % (offset, sys.maxint if count is None else count)
    else:
      limit = '' if count is None else '\nLIMIT %d' % count
    sql = '\n  AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
    sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
      self.sql_table, sql and '\nWHERE ' + sql, limit)
    return sql if src__ else Results(sql_connection().query(sql, max_rows=0))

  def getMessageList(self, *args, **kw):
    result = self._getMessageList(*args, **kw)
    if type(result) is str: # src__ == 1
      return result,
153
    class_name = self.__class__.__name__
154
    return [Message.load(line.message,
155
                             activity=class_name,
156 157
                             uid=line.uid,
                             processing_node=line.processing_node,
158
                             retry=line.retry,
159
                             processing=line.processing)
160
      for line in result]
161

162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
    """
    if isinstance(tag, str):
      tag = [tag]
    if isinstance(path, str):
      path = [path]
    if isinstance(method_id, str):
      method_id = [method_id]
    result = activity_tool.SQLBase_validateMessageList(table=self.sql_table,
                                                       method_id=method_id,
                                                       path=path,
                                                       message_uid=message_uid,
                                                       tag=tag,
                                                       serialization_tag=None,
                                                       count=1)
    return result[0].uid_count

  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None,
                  active_process_uid=None):
    hasMessage = getattr(activity_tool, 'SQLBase_hasMessage', None)
    if hasMessage is not None:
      if object is None:
        path = None
      else:
        path = '/'.join(object.getPhysicalPath())
      result = hasMessage(table=self.sql_table, path=path, method_id=method_id,
        only_valid=only_valid, active_process_uid=active_process_uid)
      if result:
        return result[0].message_count > 0
    return 0

  def getPriority(self, activity_tool):
    result = activity_tool.SQLBase_getPriority(table=self.sql_table)
    if result:
      assert len(result) == 1, len(result)
      return result[0]['priority']
    return Queue.getPriority(self, activity_tool)
201

202
  def _retryOnLockError(self, method, args=(), kw={}):
203 204
    while True:
      try:
205
        return method(*args, **kw)
206 207 208 209
      except ConflictError:
        # Note that this code assumes that a database adapter translates
        # a lock error into a conflict error.
        LOG('SQLBase', INFO, 'Got a lock error, retrying...')
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
  # Validation private methods
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

    if method_id or message_uid or path or tag or serialization_tag:
      result = activity_tool.SQLBase_validateMessageList(table=self.sql_table,
          method_id=method_id,
          message_uid=message_uid,
          path=path,
          tag=tag,
          count=False,
          serialization_tag=serialization_tag)
      message_list = []
      for line in result:
        m = Message.load(line.message,
                         line=line,
                         uid=line.uid,
                         date=line.date,
                         processing_node=line.processing_node)
        if not hasattr(m, 'order_validation_text'): # BBB
          m.order_validation_text = self.getOrderValidationText(m)
        message_list.append(m)
      return message_list

241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
  def _validate_after_method_id(self, activity_tool, message, value):
    return self._validate(activity_tool, method_id=value)

  def _validate_after_path(self, activity_tool, message, value):
    return self._validate(activity_tool, path=value)

  def _validate_after_message_uid(self, activity_tool, message, value):
    return self._validate(activity_tool, message_uid=value)

  def _validate_after_path_and_method_id(self, activity_tool, message, value):
    if not (isinstance(value, (tuple, list)) and len(value) == 2):
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_path_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, path=value[0], method_id=value[1])

  def _validate_after_tag(self, activity_tool, message, value):
    return self._validate(activity_tool, tag=value)

  def _validate_after_tag_and_method_id(self, activity_tool, message, value):
    # Count number of occurances of tag and method_id
    if not (isinstance(value, (tuple, list)) and len(value) == 2):
      LOG('CMFActivity', WARNING,
          'unable to recognize value for after_tag_and_method_id: %r' % (value,))
      return []
    return self._validate(activity_tool, tag=value[0], method_id=value[1])

  def _validate_serialization_tag(self, activity_tool, message, value):
    return self._validate(activity_tool, serialization_tag=value)
270 271 272 273 274

  def _log(self, severity, summary):
    LOG(self.__class__.__name__, severity, summary,
        error=severity>INFO and sys.exc_info() or None)

275 276
  def getReservedMessageList(self, activity_tool, date, processing_node,
                             limit=None, group_method_id=None):
277 278 279 280 281 282 283 284 285 286 287
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
        This number is not garanted to be reached, because of:
         - not enough messages being pending execution
         - race condition (other nodes reserving the same messages at the same
           time)
        This number is guaranted not to be exceeded.
        If None (or not given) no limit apply.
    """
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
    assert limit
    # Do not check already-assigned messages when trying to reserve more
    # activities, because in such case we will find one reserved activity.
    result = activity_tool.SQLBase_selectReservedMessageList(
      table=self.sql_table,
      count=limit,
      processing_node=processing_node,
      group_method_id=group_method_id,
    )
    limit -= len(result)
    if limit:
      reservable = activity_tool.SQLBase_getReservableMessageList(
        table=self.sql_table,
        count=limit,
        processing_node=processing_node,
        to_date=date,
        group_method_id=group_method_id,
      )
      if reservable:
        activity_tool.SQLBase_reserveMessageList(
          uid=[x.uid for x in reservable],
          table=self.sql_table,
          processing_node=processing_node,
        )
        # DC.ZRDB.Results.Results does not implement concatenation
        # Implement an imperfect (but cheap) concatenation. Do not update
        # __items__ nor _data_dictionary.
        assert result._names == reservable._names, (result._names,
          reservable._names)
        result._data += reservable._data
318 319 320 321 322 323 324
    return result

  def makeMessageListAvailable(self, activity_tool, uid_list):
    """
      Put messages back in processing_node=0 .
    """
    if len(uid_list):
325 326
      activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table,
                                                     uid=uid_list)
327

328 329 330 331
  def getProcessableMessageLoader(self, activity_tool, processing_node):
    # do not merge anything
    def load(line):
      uid = line.uid
332
      m = Message.load(line.message, uid=uid, line=line)
333 334 335
      return m, uid, ()
    return load

336 337 338 339 340 341 342 343 344
  def getProcessableMessageList(self, activity_tool, processing_node):
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - set reserved message to processing=1 state
      - if this message has a group_method_id:
345 346
        - reserve a bunch of messages
        - until the total "cost" of the group goes over 1
347 348
          - get one message from the reserved bunch (this messages will be
            "needed")
349
          - update the total cost
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
        - set "needed" reserved messages to processing=1 state
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
        4-tuple:
          - list of messages
          - group_method_id
          - uid_to_duplicate_uid_list_dict
    """
    def getReservedMessageList(limit, group_method_id=None):
      line_list = self.getReservedMessageList(activity_tool=activity_tool,
                                              date=now_date,
                                              processing_node=processing_node,
                                              limit=limit,
                                              group_method_id=group_method_id)
371
      if line_list:
372
        self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list])
373 374
      return line_list
    now_date = self.getNow(activity_tool)
375
    uid_to_duplicate_uid_list_dict = {}
376
    try:
377 378
      result = getReservedMessageList(1)
      if result:
379 380
        load = self.getProcessableMessageLoader(activity_tool, processing_node)
        m, uid, uid_list = load(result[0])
381
        message_list = [m]
382 383
        uid_to_duplicate_uid_list_dict[uid] = uid_list
        group_method_id = m.line.group_method_id
384
        if group_method_id != '\0':
385
          # Count the number of objects to prevent too many objects.
386 387
          cost = m.activity_kw.get('group_method_cost', .01)
          assert 0 < cost <= 1, (self.sql_table, uid)
388
          count = m.getObjectCount(activity_tool)
389 390 391 392 393
          # this is heuristic (messages with same group_method_id
          # are likely to have the same group_method_cost)
          limit = int(1. / cost + 1 - count)
          if limit > 1: # <=> cost * count < 1
            cost *= count
394
            # Retrieve objects which have the same group method.
395
            result = iter(getReservedMessageList(limit, group_method_id))
396
            for line in result:
397 398 399 400 401
              if line.uid in uid_to_duplicate_uid_list_dict:
                continue
              m, uid, uid_list = load(line)
              if m is None:
                uid_to_duplicate_uid_list_dict[uid] += uid_list
402
                continue
403
              uid_to_duplicate_uid_list_dict[uid] = uid_list
404
              cost += m.getObjectCount(activity_tool) * \
405 406 407 408 409 410 411 412
                      m.activity_kw.get('group_method_cost', .01)
              message_list.append(m)
              if cost >= 1:
                # Unreserve extra messages as soon as possible.
                self.makeMessageListAvailable(activity_tool=activity_tool,
                  uid_list=[line.uid for line in result if line.uid != uid])
        activity_tool.SQLBase_processMessage(table=self.sql_table,
          uid=uid_to_duplicate_uid_list_dict.keys())
413
        return message_list, group_method_id, uid_to_duplicate_uid_list_dict
414
    except:
415
      self._log(WARNING, 'Exception while reserving messages.')
416 417 418 419
      if uid_to_duplicate_uid_list_dict:
        to_free_uid_list = uid_to_duplicate_uid_list_dict.keys()
        for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
          to_free_uid_list += uid_list
420
        try:
421 422
          self.makeMessageListAvailable(activity_tool=activity_tool,
                                        uid_list=to_free_uid_list)
423
        except:
424
          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
425
        else:
426
          if to_free_uid_list:
427
            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
428
      else:
429
        self._log(TRACE, '(no message was reserved)')
430
    return [], None, uid_to_duplicate_uid_list_dict
431

432 433 434 435 436 437 438 439 440
  def _abort(self):
    try:
      transaction.abort()
    except:
      # Unfortunately, database adapters may raise an exception against abort.
      self._log(PANIC,
          'abort failed, thus some objects may be modified accidentally')
      raise

441 442
  # Queue semantic
  def dequeueMessage(self, activity_tool, processing_node):
443
    message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
444 445 446 447 448 449 450
      self.getProcessableMessageList(activity_tool, processing_node)
    if message_list:
      # Remove group_id parameter from group_method_id
      if group_method_id is not None:
        group_method_id = group_method_id.split('\0')[0]
      if group_method_id not in (None, ""):
        method  = activity_tool.invokeGroup
451
        args = (group_method_id, message_list, self.__class__.__name__,
452
                hasattr(self, 'generateMessageUID'))
453 454 455 456 457 458 459 460 461 462 463 464 465 466
        activity_runtime_environment = ActivityRuntimeEnvironment(None)
      else:
        method = activity_tool.invoke
        message = message_list[0]
        args = (message, )
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
      # Commit right before executing messages.
      # As MySQL transaction does not start exactly at the same time as ZODB
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
      # So all connectors must be committed now that we have selected
      # everything needed from MySQL to get a fresh view of ZODB objects.
      transaction.commit()
467
      transaction.begin()
468
      tv = getTransactionalVariable()
469 470 471 472
      tv['activity_runtime_environment'] = activity_runtime_environment
      # Try to invoke
      try:
        method(*args)
473 474 475 476 477 478 479
        # Abort if at least 1 message failed. On next tic, only those that
        # succeeded will be selected because their at_date won't have been
        # increased.
        for m in message_list:
          if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
            raise _DequeueMessageException
        transaction.commit()
480
      except:
481 482 483 484 485 486 487 488 489 490
        exc_info = sys.exc_info()
        if exc_info[1] is not _DequeueMessageException:
          self._log(WARNING,
            'Exception raised when invoking messages (uid, path, method_id) %r'
            % [(m.uid, m.object_path, m.method_id) for m in message_list])
          for m in message_list:
            m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
        self._abort()
        exc_info = message_list[0].exc_info
        if exc_info:
491
          try:
492 493 494 495 496 497 498
            # Register it again.
            tv['activity_runtime_environment'] = activity_runtime_environment
            cancel = message.on_error_callback(*exc_info)
            del exc_info, message.exc_info
            transaction.commit()
            if cancel:
              message.setExecutionState(MESSAGE_EXECUTED)
499
          except:
500 501 502
            self._log(WARNING, 'Exception raised when processing error callbacks')
            message.setExecutionState(MESSAGE_NOT_EXECUTED)
            self._abort()
503 504
      self.finalizeMessageExecution(activity_tool, message_list,
                                    uid_to_duplicate_uid_list_dict)
505 506 507
    transaction.commit()
    return not message_list

508 509 510 511 512 513 514
  def finalizeMessageExecution(self, activity_tool, message_list,
                               uid_to_duplicate_uid_list_dict=None):
    """
      If everything was fine, delete all messages.
      If anything failed, make successful messages available (if any), and
      the following rules apply to failed messages:
        - Failures due to ConflictErrors cause messages to be postponed,
515 516
          but their retry count is *not* increased.
        - Failures of messages already above maximum retry count cause them to
517
          be put in a permanent-error state.
518
        - In all other cases, retry count is increased and message is delayed.
519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
    """
    deletable_uid_list = []
    delay_uid_list = []
    final_error_uid_list = []
    make_available_uid_list = []
    notify_user_list = []
    executed_uid_list = deletable_uid_list
    if uid_to_duplicate_uid_list_dict is not None:
      for m in message_list:
        if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
          executed_uid_list = make_available_uid_list
          break
    for m in message_list:
      uid = m.uid
      if m.getExecutionState() == MESSAGE_EXECUTED:
        executed_uid_list.append(uid)
        if uid_to_duplicate_uid_list_dict is not None:
          executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
        # Should duplicate messages follow strictly the original message, or
        # should they be just made available again ?
        if uid_to_duplicate_uid_list_dict is not None:
          make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
542 543
        if (m.exc_type and # m.exc_type may be None
            m.conflict_retry and issubclass(m.exc_type, ConflictError)):
544 545
          delay_uid_list.append(uid)
        else:
546
          max_retry = m.max_retry
547
          retry = m.line.retry
548 549
          if max_retry is not None and retry >= max_retry:
            # Always notify when we stop retrying.
550
            notify_user_list.append((m, False))
551 552
            final_error_uid_list.append(uid)
            continue
553 554
          # In case of infinite retry, notify the user
          # when the default limit is reached.
555
          if max_retry is None and retry == DEFAULT_MAX_RETRY:
556
            notify_user_list.append((m, True))
557 558 559 560
          delay = m.delay
          if delay is None:
            # By default, make delay quadratic to the number of retries.
            delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) / 2
561 562 563 564
          try:
            # Immediately update, because values different for every message
            activity_tool.SQLBase_reactivate(table=self.sql_table,
                                             uid=[uid],
565 566
                                             delay=delay,
                                             retry=1)
567
          except:
568 569
            self._log(WARNING, 'Failed to reactivate %r' % uid)
        make_available_uid_list.append(uid)
570 571 572 573 574 575 576 577
      else: # MESSAGE_NOT_EXECUTABLE
        # 'path' does not point to any object. Activities are normally flushed
        # (without invoking them) when an object is deleted, but this is only
        # an optimisation. There is no efficient and reliable way to do such
        # this, because a concurrent and very long transaction may be about to
        # activate this object, without conflict.
        # So we have to clean up any remaining activity.
        deletable_uid_list.append(uid)
578 579 580 581 582 583 584 585 586 587 588
    if deletable_uid_list:
      try:
        self._retryOnLockError(activity_tool.SQLBase_delMessage,
                               kw={'table': self.sql_table,
                                   'uid': deletable_uid_list})
      except:
        self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
      else:
        self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
    if delay_uid_list:
      try:
589
        # If this is a conflict error, do not increase 'retry' but only delay.
590
        activity_tool.SQLBase_reactivate(table=self.sql_table,
591
          uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, retry=None)
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
      except:
        self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
    if final_error_uid_list:
      try:
        activity_tool.SQLBase_assignMessage(table=self.sql_table,
          uid=final_error_uid_list, processing_node=INVOKE_ERROR_STATE)
      except:
        self._log(ERROR, 'Failed to set message to error state for %r'
                         % final_error_uid_list)
    if make_available_uid_list:
      try:
        self.makeMessageListAvailable(activity_tool=activity_tool,
                                      uid_list=make_available_uid_list)
      except:
        self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
      else:
        self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
    try:
610 611
      for m, retry in notify_user_list:
        m.notifyUser(activity_tool, retry)
612 613 614 615
    except:
      # Notification failures must not cause this method to raise.
      self._log(WARNING,
        'Exception during notification phase of finalizeMessageExecution')
616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656

  def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
    """
      object_path is a tuple
    """
    path = '/'.join(object_path)
    if invoke:
      invoked = set()
      def invoke(message):
        try:
          key = self.generateMessageUID(message)
          if key in invoked:
            return
          invoked.add(key)
        except AttributeError:
          pass
        line = getattr(message, 'line', None)
        validate_value = VALID if line and line.processing_node != -1 else \
                         message.validate(self, activity_tool)
        if validate_value == VALID:
          # Try to invoke the message - what happens if invoke calls flushActivity ??
          activity_tool.invoke(message)
          if message.getExecutionState() != MESSAGE_EXECUTED:
            raise ActivityFlushError('Could not invoke %s on %s'
                                     % (message.method_id, path))
        elif validate_value is INVALID_PATH:
          raise ActivityFlushError('The document %s does not exist' % path)
        else:
          raise ActivityFlushError('Could not validate %s on %s'
                                   % (message.method_id, path))
    for m in activity_tool.getRegisteredMessageList(self):
      if object_path == m.object_path and (
         method_id is None or method_id == m.method_id):
        if invoke:
          invoke(m)
        activity_tool.unregisterMessage(self, m)
    uid_list = []
    for line in self._getMessageList(activity_tool, path=path, processing=0,
        **({'method_id': method_id} if method_id else {})):
      uid_list.append(line.uid)
      if invoke:
657
        invoke(Message.load(line.message, uid=line.uid, line=line))
658 659
    if uid_list:
      activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
660 661 662 663 664 665 666 667 668

  # Required for tests
  def timeShift(self, activity_tool, delay, processing_node=None):
    """
      To simulate time shift, we simply substract delay from
      all dates in message(_queue) table
    """
    activity_tool.SQLBase_timeShift(table=self.sql_table, delay=delay,
                                    processing_node=processing_node)