SQLBase.py 48.4 KB
Newer Older
1
from __future__ import absolute_import
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
#                    Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

30 31
from six import string_types as basestring
from six.moves import xrange
32
from Products.ERP5Type.Utils import ensure_list, str2bytes, bytes2str
33
from collections import defaultdict
34
from contextlib import contextmanager
35
from itertools import product, chain
36
import operator
37
import sys
38
import transaction
39 40 41
from random import getrandbits
import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
42 43
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
Julien Muchembled's avatar
Julien Muchembled committed
44
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
45
from ZODB.POSException import ConflictError
46
from Products.CMFActivity.ActivityTool import (
47
  Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
48
from Products.CMFActivity.ActivityRuntimeEnvironment import (
49
  DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment)
50
from .Queue import Queue, VALIDATION_ERROR_DELAY
51
from Products.CMFActivity.Errors import ActivityFlushError
52
from Products.ERP5Type import Timeout
53
from Products.ERP5Type.Timeout import Deadline
54
import six
55

56 57 58 59
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
60
INVOKE_ERROR_STATE = -2
61
DEPENDENCY_IGNORED_ERROR_STATE = -10
62 63 64 65 66 67 68 69 70 71 72 73 74
# Activity uids are stored as 64 bits unsigned integers.
# No need to depend on a database that supports unsigned integers.
# Numbers are far big enough without using the MSb. Assuming a busy activity
# table having one million activities, the probability of triggering a conflict
# when inserting one activity with 64 bits uid is 0.5e-13. With 63 bits it
# increases to 1e-13, which is still very low.
UID_SAFE_BITSIZE = 63
# Inserting an activity batch of 100 activities among one million existing
# activities has a probability of failing of 1e-11. While it should be low
# enough, retries can help lower that. Try 10 times, which should be short
# enough while yielding one order of magnitude collision probability
# improvement.
UID_ALLOCATION_TRY_COUNT = 10
75 76 77 78 79 80
# Limit the number of UNION-joined subqueries per query when looking for
# blocking activities. Used to take a slice from a list.
# XXX: 5000 is known to work on a case on "after_tag" dependency, which fails
# at 5400 with:
#   ProgrammingError: (1064, "memory exhausted near [...]")
_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT = -5000
81

82
def sort_message_key(message):
83
  # same sort key as in SQLBase.getMessageList
84 85
  return message.line.priority, message.line.date, message.uid

86 87
class _DequeueMessageException(Exception):
  pass
88

89 90 91
_ITEMGETTER0 = operator.itemgetter(0)
_IDENTITY = lambda x: x

92 93 94
def render_datetime(x):
  return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]

95 96 97 98
if six.PY2:
  _SQLTEST_NO_QUOTE_TYPE_SET = int, float, long
else:
  _SQLTEST_NO_QUOTE_TYPE_SET = int, float
99 100
_SQLTEST_NON_SEQUENCE_TYPE_SET = _SQLTEST_NO_QUOTE_TYPE_SET + (DateTime, basestring)

101 102 103 104 105 106 107 108 109 110
@contextmanager
def SQLLock(db, lock_name, timeout):
  """
  Attemp to acquire a named SQL lock. The outcome of this acquisition is
  returned to the context statement and MUST be checked:
  1: lock acquired
  0: timeout
  """
  lock_name = db.string_literal(lock_name)
  query = db.query
111
  (_, ((acquired, ), )) = query(
112
    b'SELECT GET_LOCK(%s, %f)' % (lock_name, timeout),
113 114
    max_rows=0,
  )
115 116 117 118 119 120
  if acquired is None:
    raise ValueError('Error acquiring lock')
  try:
    yield acquired
  finally:
    if acquired:
121
      query(
122
        b'SELECT RELEASE_LOCK(%s)' % (lock_name, ),
123 124
        max_rows=0,
      )
125 126 127 128 129 130 131
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
  sqltest_dict = {}
  def _(name, column=None, op="="):
    if column is None:
      column = name
132
    column_op = str2bytes(("%s %s " % (column, op)))
133
    def render(value, render_string):
134
      if isinstance(value, _SQLTEST_NO_QUOTE_TYPE_SET):
135
        return column_op + str2bytes(str(value))
136
      if isinstance(value, DateTime):
137
        value = render_datetime(value)
138 139 140 141
      if isinstance(value, basestring):
        return column_op + render_string(value)
      assert op == "=", value
      if value is None: # XXX: see comment in SQLBase._getMessageList
142
        return column + b" IS NULL"
143
      for x in value:
144
        return str2bytes("%s IN (%s)" % (column, ', '.join(map(
145
          str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else
146
          render_datetime if isinstance(x, DateTime) else
147
          lambda v: bytes2str(render_string(v)), value))))
148
      return b"0"
149 150 151 152 153 154 155 156
    sqltest_dict[name] = render
  _('active_process_uid')
  _('group_method_id')
  _('method_id')
  _('path')
  _('processing_node')
  _('serialization_tag')
  _('tag')
157
  _('retry')
158 159
  _('to_date', column="date", op="<=")
  _('uid')
160 161 162 163 164
  def renderAbovePriorityDateUid(value, render_string):
    # Strictly dependent on _getMessageList's sort order: given a well-ordered
    # list of values, rendered condition will match the immediate next row in
    # that sort order.
    priority, date, uid = value
165 166
    assert isinstance(priority, _SQLTEST_NO_QUOTE_TYPE_SET)
    assert isinstance(uid, _SQLTEST_NO_QUOTE_TYPE_SET)
167
    return (
168 169 170 171
        b'(priority>%(priority)d OR (priority=%(priority)d AND '
        b'(date>%(date)s OR (date=%(date)s AND uid>%(uid)d))'
        b'))' % {
        b'priority': priority,
172
        # render_datetime raises if "date" lacks date API, so no need to check
173 174
        b'date': render_string(render_datetime(date)),
        b'uid': uid,
175 176 177
      }
    )
  sqltest_dict['above_priority_date_uid'] = renderAbovePriorityDateUid
178 179 180
  return sqltest_dict
sqltest_dict = sqltest_dict()

181 182 183 184
def _validate_after_path_and_method_id(value, render_string):
  path, method_id = value
  return (
    sqltest_dict['method_id'](method_id, render_string) +
185
    b' AND ' +
186 187 188 189 190 191 192
    sqltest_dict['path'](path, render_string)
  )

def _validate_after_tag_and_method_id(value, render_string):
  tag, method_id = value
  return (
    sqltest_dict['method_id'](method_id, render_string) +
193
    b' AND ' +
194 195 196 197 198 199 200 201 202 203
    sqltest_dict['tag'](tag, render_string)
  )

# Definition of activity dependencies
# key: dependency name (as passed to ActiveObject.activate() & friends)
# value:
# - tuple of column names. If there is more than one, they must be in the
#   same order as the dependency value items expected by the next item
# - callable rendering given values into an SQL condition
#   (value, render_string) -> str
204
# - minimal applicable processing_node value (excluded)
205 206 207 208
_DEPENDENCY_TESTER_DICT = {
  'after_method_id': (
    ('method_id', ),
    sqltest_dict['method_id'],
209
    DEPENDENCY_IGNORED_ERROR_STATE,
210 211 212 213
  ),
  'after_path': (
    ('path', ),
    sqltest_dict['path'],
214
    DEPENDENCY_IGNORED_ERROR_STATE,
215 216 217 218
  ),
  'after_message_uid': (
    ('uid', ),
    sqltest_dict['uid'],
219
    DEPENDENCY_IGNORED_ERROR_STATE,
220 221 222 223
  ),
  'after_path_and_method_id': (
    ('path', 'method_id'),
    _validate_after_path_and_method_id,
224
    DEPENDENCY_IGNORED_ERROR_STATE,
225 226 227 228
  ),
  'after_tag': (
    ('tag', ),
    sqltest_dict['tag'],
229
    DEPENDENCY_IGNORED_ERROR_STATE,
230 231 232 233
  ),
  'after_tag_and_method_id': (
    ('tag', 'method_id'),
    _validate_after_tag_and_method_id,
234
    DEPENDENCY_IGNORED_ERROR_STATE,
235 236 237
  ),
  'serialization_tag': (
    ('serialization_tag', ),
238 239
    sqltest_dict['serialization_tag'],
    -1,
240 241 242
  ),
}

243 244 245 246 247 248
def getNow(db):
  """
    Return the UTC date from the point of view of the SQL server.
    Note that this value is not cached, and is not transactionnal on MySQL
    side.
  """
249
  return db.query(b"SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]
250

251
class SQLBase(Queue):
252 253 254
  """
    Define a set of common methods for SQL-based storage of activities.
  """
255 256 257 258 259 260
  def createTableSQL(self):
    return """\
CREATE TABLE %s (
  `uid` BIGINT UNSIGNED NOT NULL,
  `date` DATETIME(6) NOT NULL,
  `path` VARCHAR(255) NOT NULL,
261
  `active_process_uid` BIGINT UNSIGNED NULL,
262 263 264
  `method_id` VARCHAR(255) NOT NULL,
  `processing_node` SMALLINT NOT NULL DEFAULT -1,
  `priority` TINYINT NOT NULL DEFAULT 0,
265
  `node` SMALLINT NOT NULL DEFAULT 0,
266 267 268 269 270 271 272
  `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
  `tag` VARCHAR(255) NOT NULL,
  `serialization_tag` VARCHAR(255) NOT NULL,
  `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
  `message` LONGBLOB NOT NULL,
  PRIMARY KEY (`uid`),
  KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
273
  KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`),
274
  KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
275
  KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
276
  KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
277
  KEY (`path`, `processing_node`),
278
  KEY (`active_process_uid`),
279 280
  KEY (`method_id`, `processing_node`),
  KEY (`tag`, `processing_node`)
281
) ENGINE=InnoDB""" % self.sql_table
282

283
  def initialize(self, activity_tool, clear):
284 285
    db = activity_tool.getSQLConnection()
    create = self.createTableSQL()
286
    if clear:
287
      db.query(str2bytes("DROP TABLE IF EXISTS " + self.sql_table))
288
      db.query(create)
289
    else:
290 291
      src = db.upgradeSchema(create, create_if_not_exists=1,
                                     initialize=self._initialize)
292 293 294
      if src:
        LOG('CMFActivity', INFO, "%r table upgraded\n%s"
            % (self.sql_table, src))
295 296
    self._insert_max_payload = (db.getMaxAllowedPacket()
      + len(self._insert_separator)
297
      - len(self._insert_template % (str2bytes(self.sql_table), b'')))
298 299 300 301 302

  def _initialize(self, db, column_list):
      LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
          " The following added columns could not be initialized: %s"
          % (self.sql_table, ", ".join(column_list)))
303

304 305 306 307 308
  _insert_template = (b"INSERT INTO %s (uid,"
    b" path, active_process_uid, date, method_id, processing_node,"
    b" priority, node, group_method_id, tag, serialization_tag,"
    b" message) VALUES\n(%s)")
  _insert_separator = b"),\n("
309

310
  def _hasDependency(self, message):
311 312 313
    get = message.activity_kw.get
    return any(
      get(x) is not None
314
      for x in _DEPENDENCY_TESTER_DICT
315 316
    )

317
  def prepareQueueMessageList(self, activity_tool, message_list):
318 319 320
    db = activity_tool.getSQLConnection()
    quote = db.string_literal
    def insert(reset_uid):
321
      values = self._insert_separator.join(values_list)
322
      del values_list[:]
323
      for _ in xrange(UID_ALLOCATION_TRY_COUNT):
324 325 326
        if reset_uid:
          reset_uid = False
          # Overflow will result into IntegrityError.
327
          db.query(b"SET @uid := %d" % getrandbits(UID_SAFE_BITSIZE))
328
        try:
329 330 331
          db.query(self._insert_template % (str2bytes(self.sql_table), values))
        except MySQLdb.IntegrityError as e:
          if e.args[0] != DUP_ENTRY:
332
            raise
333
          reset_uid = True
334 335 336
        else:
          break
      else:
337 338 339 340
        raise RuntimeError("Maximum retry for prepareQueueMessageList reached")
    i = 0
    reset_uid = True
    values_list = []
341 342
    max_payload = self._insert_max_payload
    sep_len = len(self._insert_separator)
343
    hasDependency = self._hasDependency
344 345 346 347
    for m in message_list:
      if m.is_registered:
        active_process_uid = m.active_process_uid
        date = m.activity_kw.get('at_date')
348 349
        row = b','.join((
          b'@uid+%d' % i,
350
          quote('/'.join(m.object_path)),
351 352
          b'NULL' if active_process_uid is None else str2bytes(str(active_process_uid)),
          b"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
353
          quote(m.method_id),
354 355 356
          b'-1' if hasDependency(m) else b'0',
          str2bytes(str(m.activity_kw.get('priority', 1))),
          str2bytes(str(m.activity_kw.get('node', 0))),
357
          quote(m.getGroupId()),
358 359
          quote(m.activity_kw.get('tag', b'')),
          quote(m.activity_kw.get('serialization_tag', b'')),
360
          quote(Message.dump(m))))
361
        i += 1
362 363 364 365 366 367 368 369 370 371
        n = sep_len + len(row)
        max_payload -= n
        if max_payload < 0:
          if values_list:
            insert(reset_uid)
            reset_uid = False
            max_payload = self._insert_max_payload - n
          else:
            raise ValueError("max_allowed_packet too small to insert message")
        values_list.append(row)
372 373
    if values_list:
      insert(reset_uid)
374

375
  def _getMessageList(self, db, count=1000, src__=0, **kw):
376 377 378
    # XXX: Because most columns have NOT NULL constraint, conditions with None
    #      value should be ignored, instead of trying to render them
    #      (with comparisons with NULL).
379
    q = db.string_literal
380 381 382 383 384
    sql = b'\n  AND '.join(sqltest_dict[k](v, q) for k, v in six.iteritems(kw))
    sql = b"SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
      str2bytes(self.sql_table),
      sql and b'\nWHERE ' + sql,
      b'' if count is None else b'\nLIMIT %d' % count,
385
    )
386
    return sql if src__ else Results(db.query(sql, max_rows=0))
387

388 389
  def getMessageList(self, activity_tool, *args, **kw):
    result = self._getMessageList(activity_tool.getSQLConnection(), *args, **kw)
390 391
    if type(result) is str: # src__ == 1
      return result,
392
    class_name = self.__class__.__name__
393
    return [Message.load(line.message,
394
                             activity=class_name,
395 396
                             uid=line.uid,
                             processing_node=line.processing_node,
397
                             retry=line.retry)
398
      for line in result]
399

400
  def countMessageSQL(self, quote, **kw):
401 402
    return b"SELECT count(*) FROM %s WHERE processing_node > %d AND %s" % (
      str2bytes(self.sql_table), DEPENDENCY_IGNORED_ERROR_STATE, b" AND ".join(
403
        sqltest_dict[k](v, quote) for (k, v) in six.iteritems(kw) if v
404
        ) or b"1")
405 406

  def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw):
407
    where = [sqltest_dict[k](v, quote) for (k, v) in six.iteritems(kw) if v]
408
    if only_valid:
409
      where.append(b'processing_node > %d' % INVOKE_ERROR_STATE)
410
    if only_invalid:
411 412 413
      where.append(b'processing_node <= %d' % INVOKE_ERROR_STATE)
    return b"SELECT 1 FROM %s WHERE %s LIMIT 1" % (
      str2bytes(self.sql_table), b" AND ".join(where) or b"1")
414

415
  def getPriority(self, activity_tool, processing_node, node_set=None):
416
    query = activity_tool.getSQLConnection().query
417
    if node_set is None:
418 419 420 421 422 423 424 425 426 427
      result = query(
        b"SELECT 3*priority, date"
        b" FROM %s"
        b" WHERE"
        b"  processing_node=0 AND"
        b"  date <= UTC_TIMESTAMP(6)"
        b" ORDER BY priority, date"
        b" LIMIT 1" % str2bytes(self.sql_table),
        0,
      )[1]
428
    else:
429
      subquery = lambda *a, **k: str2bytes(bytes2str(b"("
430 431 432 433 434 435 436 437
        b"SELECT 3*priority{} AS effective_priority, date"
        b" FROM %s"
        b" WHERE"
        b"  {} AND"
        b"  processing_node=0 AND"
        b"  date <= UTC_TIMESTAMP(6)"
        b" ORDER BY priority, date"
        b" LIMIT 1"
438
      b")" % str2bytes(self.sql_table)).format(*a, **k))
439 440 441 442 443 444 445 446
      result = query(
        b"SELECT *"
        b" FROM (%s) AS t"
        b" ORDER BY effective_priority, date"
        b" LIMIT 1" % (
          b" UNION ALL ".join(
            chain(
              (
447 448
                subquery('-1', 'node = %i' % processing_node),
                subquery('', 'node=0'),
449 450
              ),
              (
451
                subquery('-1', 'node = %i' % x)
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
                for x in node_set
              ),
            ),
          )
        ),
        0,
      )[1]
      if not result:
        # We did not find any activity matching our node (by number nor by
        # family), nor by having no node preference. Look for any other
        # activity we could be allowed to execute.
        # This is slower than the above queries, because it does a range
        # scan, either on the "node" column to sort the set, or on the
        # sorted set to filter negative node values.
        # This is why this query is only executed when the previous one
        # did not find anything.
468
        result = query(subquery('+1', 'node>0'), 0)[1]
469 470
    if result:
      return result[0]
471
    return Queue.getPriority(self, activity_tool, processing_node, node_set)
472

473
  def _retryOnLockError(self, method, args=(), kw={}):
474 475
    while True:
      try:
476
        return method(*args, **kw)
477 478 479 480
      except ConflictError:
        # Note that this code assumes that a database adapter translates
        # a lock error into a conflict error.
        LOG('SQLBase', INFO, 'Got a lock error, retrying...')
481

482 483
  def _log(self, severity, summary):
    LOG(self.__class__.__name__, severity, summary,
484
        error=severity > INFO)
485

486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519
  def _getExecutableMessageSet(self, activity_tool, db, message_list):
    """
    Return, from given message list, the set of messages which have all their
    dependencies satisfied.
    """
    # Principle of operation:
    # For each dependency type used in given message list, find all messages
    # matching any of the dependency values used in given message list.
    # This provides the SQL database with structurally simple queries that it
    # should be able to optimise easily.
    # Further refinements:
    # - Any blocked message is ignored in further dendency type lookups (we
    #   already know it is blocked, no point in checking further).
    # - Test the most popular dependency types first, with the expectation
    #   that these will find most of the blockers, reducing the set of
    #   activities left to test and (with the refinement above) reducing the
    #   probability of having to run further queries (if there are other
    #   dependency types to test)
    dependency_tester_dict = _DEPENDENCY_TESTER_DICT
    # dependency_name (str): Something like 'serialization_tag', etc
    # dependency_value (any): dependency_name-dependent structure and meaning.
    # dependency_dict: define the dependencies to check, and which messages are
    # blocked by each found blocker.
    #   [dependency_name][dependency_value] -> message set
    dependency_dict = defaultdict(lambda: defaultdict(set))
    # message_dependency_dict: define which message has which dependencies, to
    # efficiently remove a message from dependency_dict once it is found to be
    # blocked.
    #   [message][dependency_name] -> dependency_value
    message_dependency_dict = defaultdict(dict)
    for message in message_list:
      for (
        dependency_name,
        dependency_value,
520
      ) in six.iteritems(message.activity_kw):
521
        try:
522
          column_list, _, _ = dependency_tester_dict[dependency_name]
523 524 525 526 527
        except KeyError:
          continue
        # There are 2 types of dependencies:
        # - monovalued (most), which accepts a single value and a vector of
        #   values.
528 529
        # - n-valued (after_path_and_method_id and after_tag_and_method_id)
        #   which accept a n-vector, each item being a single value or a vector
530
        #   of values.
531 532 533
        # Convert every form into its vector equivalent form, ignoring
        # conditions which cannot match any activity, and (for n-valued)
        # enumerate all possible combinations for later reverse-lookup.
534 535
        column_count = len(column_list)
        if column_count == 1:
536 537
          if dependency_value is None:
            continue
538 539 540 541
          dependency_value_list = [
            x
            for x in (
              (dependency_value, )
542 543 544 545 546
              if isinstance(
                dependency_value,
                _SQLTEST_NON_SEQUENCE_TYPE_SET,
              ) else
              dependency_value
547
            )
548
            # None values cannot match any activity, ignore them.
549 550 551
            if x is not None
          ]
        else:
552 553 554 555 556 557 558 559 560 561
          try:
            if (
              len(dependency_value) != column_count or
              None in dependency_value
            ):
              # Malformed or impossible to match dependency, ignore.
              continue
          except TypeError:
            # Malformed dependency, ignore.
            continue
562 563
          # Note: it any resulting item ends up empty (ex: it only contained
          # None), product will return an empty list.
564 565
          dependency_value_list = list(product(*(
            (
566
              (dependency_column_value, )
567 568 569 570
              if isinstance(
                dependency_column_value,
                _SQLTEST_NON_SEQUENCE_TYPE_SET,
              ) else
571
              (x for x in dependency_column_value if x is not None)
572
            )
573
            for dependency_column_value in dependency_value
574 575 576 577 578 579 580 581 582 583 584 585 586 587
          )))
        if not dependency_value_list:
          continue
        message_dependency_dict[message][dependency_name] = dependency_value_list
        dependency_value_dict = dependency_dict[dependency_name]
        for dependency_value in dependency_value_list:
          dependency_value_dict[dependency_value].add(message)
    # Messages are supposed valid until blockage is found.
    result = set(message_list)
    # Messages for which a blockage is found, so removal of this message from
    # further dependency processing is delayed to the next iteration, to avoid
    # doing such work if there is no such further iteration.
    new_blocked_message_set = set()
    quote = db.string_literal
588
    table_name_list = activity_tool.getSQLTableNameSet()
589 590 591 592
    for (
      dependency_name,
      dependency_value_dict,
    ) in sorted(
593
      six.iteritems(dependency_dict),
594 595 596 597 598
      # Test first the condition with the most values.
      # XXX: after_path=('foo', 'bar') counts as 2 points for after_path
      # despite being a single activity. Is there a fairer (while cheap) way ?
      key=lambda dependency_dict_item: sum(
        len(message_set)
599
        for message_set in six.itervalues(dependency_dict_item[1])
600 601 602 603 604 605 606 607 608 609 610 611
      ),
      reverse=True,
    ):
      # Previous iteration found blocked messages.
      # Find which activities, and remove their values from dependency_dict
      # so these activities are not tested in further queries (we already
      # know they are blocked).
      while new_blocked_message_set:
        blocked_message = new_blocked_message_set.pop()
        for (
          message_dependency_name,
          message_dependency_value_list,
612
        ) in six.iteritems(message_dependency_dict[blocked_message]):
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
          message_dependency_value_dict = dependency_dict[message_dependency_name]
          if not message_dependency_value_dict:
            # This dependency was already dropped or evaluated, nothing to
            # cleanup here.
            continue
          for message_dependency_value in message_dependency_value_list:
            message_set = message_dependency_value_dict[message_dependency_value]
            message_set.remove(blocked_message)
            if not message_set:
              # No more message wait for this value for this dependency, drop
              # the entry.
              del message_dependency_value_dict[message_dependency_value]
          # Note: no point in editing dependency_dict if
          # message_dependency_value_dict is empty, the outer loop is working
          # on a copy.
      if not dependency_value_dict:
        # No more non-blocked message for this dependency, skip it.
        continue
631 632 633
      column_list, to_sql, min_processing_node = dependency_tester_dict[
        dependency_name
      ]
634 635 636 637
      row2key = (
        _ITEMGETTER0
        if len(column_list) == 1 else
        _IDENTITY
638
      )
639
      base_sql_suffix = b' WHERE processing_node > %i AND (%%s) LIMIT 1)' % (
640
        min_processing_node,
641 642 643 644 645
      )
      sql_suffix_list = [
        base_sql_suffix % to_sql(dependency_value, quote)
        for dependency_value in dependency_value_dict
      ]
646 647
      base_sql_prefix = b'(SELECT %s FROM ' % (
        b','.join([ str2bytes(c) for c in column_list ]),
648
      )
649
      subquery_list = [
650
        base_sql_prefix + str2bytes(table_name) + sql_suffix
651 652 653 654 655 656 657 658 659 660
        for table_name in table_name_list
        for sql_suffix in sql_suffix_list
      ]
      while subquery_list:
        # Join queries with a UNION, to reduce per-query latency.
        # Also, limit the number of subqueries per query, as their number can
        # largely exceed the number of activities being considered multiplied
        # by the number of activty tables: it is also proportional to the
        # number of distinct values being looked for in the current column.
        for row in db.query(
661
          b' UNION '.join(subquery_list[_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT:]),
662 663 664 665 666 667
          max_rows=0,
        )[1]:
          # Each row is a value which blocks some activities.
          dependent_message_set = dependency_value_dict[row2key(row)]
          # queue blocked messages for processing in the beginning of next
          # outermost iteration.
668
          new_blocked_message_set |= dependent_message_set
669 670
          # ...but update result immediately, in case there is no next
          # outermost iteration.
671
          result -= dependent_message_set
672
        del subquery_list[_MAX_DEPENDENCY_UNION_SUBQUERY_COUNT:]
673 674 675
      dependency_value_dict.clear()
    return result

676
  def distribute(self, activity_tool, node_count):
677
    db = activity_tool.getSQLConnection()
678 679
    where_kw = {
      'processing_node': -1,
680
      'to_date': getNow(db),
681 682
      'count': READ_MESSAGE_LIMIT,
    }
683 684
    validated_count = 0
    while 1:
685
      result = self._getMessageList(db, **where_kw)
686
      if not result:
687
        transaction.commit()
688
        return False
689 690 691
      message_list = [Message.load(line.message, uid=line.uid, line=line)
                      for line in result]
      message_set = self._getExecutableMessageSet(activity_tool, db, message_list)
692
      transaction.commit()
693
      if message_set:
694 695
        distributable_uid_set = set()
        serialization_tag_dict = {}
696
        for message in message_set:
697 698 699 700 701 702
          serialization_tag = message.activity_kw.get('serialization_tag')
          if serialization_tag is None:
            distributable_uid_set.add(message.uid)
          else:
            serialization_tag_dict.setdefault(serialization_tag,
                                              []).append(message)
703
        for message_list in six.itervalues(serialization_tag_dict):
704 705 706 707 708 709 710 711 712 713 714
          # Sort list of messages to validate the message with highest score
          message_list.sort(key=sort_message_key)
          distributable_uid_set.add(message_list[0].uid)
          group_method_id = message_list[0].line.group_method_id
          if group_method_id == '\0':
            continue
          for message in message_list[1:]:
            if group_method_id == message.line.group_method_id:
              distributable_uid_set.add(message.uid)
        distributable_count = len(distributable_uid_set)
        if distributable_count:
715
          self.assignMessageList(db, 0, distributable_uid_set)
716 717
          validated_count += distributable_count
          if validated_count >= MAX_VALIDATED_LIMIT:
718
            transaction.commit()
719
            return True
720
      line = result[-1]
721
      where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid)
722

723
  def getReservedMessageList(self, db, date, processing_node, limit,
724
                             group_method_id=None, node_set=None):
725 726 727 728
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
729 730
        This number is not garanted to be reached, because of not enough
        messages being pending execution.
731
    """
732
    assert limit
733 734
    quote = db.string_literal
    query = db.query
735 736 737 738 739 740 741 742 743 744
    args = (
      str2bytes(self.sql_table),
      sqltest_dict['to_date'](date, quote),
      (
        b' AND group_method_id=' + quote(group_method_id)
        if group_method_id else
        b''
      ),
      limit,
    )
745

746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
    # Note: Not all write accesses to our table are protected by this lock.
    # This lock is not here for data consistency reasons, but to avoid wasting
    # time on SQL deadlocks caused by the varied lock ordering chosen by the
    # database. These queries specifically seem to be extremely prone to such
    # deadlocks, so prevent them from attempting to run in parallel on a given
    # activity table.
    # If more accesses are found to cause a significant waste of time because
    # of deadlocks, then they should acquire such lock as well. But
    # preemptively applying such lock everywhere without checking the amount
    # of waste is unlikely to produce a net gain.
    # XXX: timeout may benefit from being tweaked, but one second seem like a
    # reasonable starting point.
    # XXX: locking could probably be skipped altogether on clusters with few
    # enough processing nodes, as there should be little deadlocks and the
    # tradeoff becomes unfavorable to explicit locks. What threshold to
    # choose ?
    with SQLLock(db, self.sql_table, timeout=1) as acquired:
      if not acquired:
        # This table is busy, check for work to do elsewhere
        return ()
      # Get reservable messages.
      # During normal operation, sorting by date (as last criteria) is fairer
      # for users and reduce the probability to do the same work several times
      # (think of an object that is modified several times in a short period of
      # time).
      if node_set is None:
        result = Results(query(
773 774 775 776 777 778
          b"SELECT *"
          b" FROM %s"
          b" WHERE"
          b"  processing_node=0 AND"
          b"  %s%s"
          b" ORDER BY priority, date"
779 780
          b" LIMIT %i"
          b" FOR UPDATE" % args,
781 782
          0,
        ))
783
      else:
784
        subquery = lambda *a, **k: str2bytes(bytes2str(b"("
785 786 787 788 789 790 791 792
          b"SELECT *, 3*priority{} AS effective_priority"
          b" FROM %s"
          b" WHERE"
          b"  {} AND"
          b"  processing_node=0 AND"
          b"  %s%s"
          b" ORDER BY priority, date"
          b" LIMIT %i"
793
        b")" % args).format(*a, **k))
794
        result = Results(query(
795 796 797 798 799 800 801
          b"SELECT *"
          b" FROM (%s) AS t"
          b" ORDER BY effective_priority, date"
          b" LIMIT %i" % (
            b" UNION ALL ".join(
              chain(
                (
802 803
                  subquery('-1', 'node = %i' % processing_node),
                  subquery('', 'node=0'),
804 805
                ),
                (
806
                  subquery('-1', 'node = %i' % x)
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823
                  for x in node_set
                ),
              ),
            ),
            limit,
          ),
          0,
        ))
        if not result:
          # We did not find any activity matching our node (by number nor by
          # family), nor by having no node preference. Look for any other
          # activity we could be allowed to execute.
          # This is slower than the above queries, because it does a range
          # scan, either on the "node" column to sort the set, or on the
          # sorted set to filter negative node values.
          # This is why this query is only executed when the previous one
          # did not find anything.
824
          result = Results(query(subquery('+1', 'node>0'), 0))
825 826 827 828 829 830
      if result:
        # Reserve messages.
        uid_list = [x.uid for x in result]
        self.assignMessageList(db, processing_node, uid_list)
        self._log(TRACE, 'Reserved messages: %r' % uid_list)
        return result
831 832 833
    return ()

  def assignMessageList(self, db, state, uid_list):
834
    """
835
      Put messages back in given processing_node.
836
    """
837 838
    db.query(("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
      self.sql_table, state, ','.join(map(str, uid_list)))).encode())
839

840
  def getProcessableMessageLoader(self, db, processing_node):
841 842 843
    # do not merge anything
    def load(line):
      uid = line.uid
844
      m = Message.load(line.message, uid=uid, line=line)
845 846 847
      return m, uid, ()
    return load

848 849
  def getProcessableMessageList(self, activity_tool, processing_node,
                                node_family_id_list):
850 851 852 853 854 855 856
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - if this message has a group_method_id:
857 858
        - reserve a bunch of messages
        - until the total "cost" of the group goes over 1
859 860
          - get one message from the reserved bunch (this messages will be
            "needed")
861
          - update the total cost
862 863 864 865 866 867 868 869 870 871 872 873 874 875
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
        4-tuple:
          - list of messages
          - group_method_id
          - uid_to_duplicate_uid_list_dict
    """
876 877
    db = activity_tool.getSQLConnection()
    now_date = getNow(db)
878
    uid_to_duplicate_uid_list_dict = {}
879
    try:
880 881 882 883 884 885
      while 1: # not a loop
        # Select messages that were either assigned manually or left
        # unprocessed after a shutdown. Most of the time, there's none.
        # To minimize the probability of deadlocks, we also COMMIT so that a
        # new transaction starts on the first 'FOR UPDATE' query, which is all
        # the more important as the current on started with getPriority().
886 887 888
        result = db.query(b"SELECT * FROM %s WHERE processing_node=%d"
          b" ORDER BY priority, date LIMIT 1\0COMMIT" % (
          str2bytes(self.sql_table), processing_node), 0)
889 890 891 892 893
        already_assigned = result[1]
        if already_assigned:
          result = Results(result)
        else:
          result = self.getReservedMessageList(db, now_date, processing_node,
894
                                               1, node_set=node_family_id_list)
895 896
          if not result:
            break
897 898 899
          # So reserved documents are properly released even if load raises.
          for line in result:
            uid_to_duplicate_uid_list_dict[line.uid] = []
900
        load = self.getProcessableMessageLoader(db, processing_node)
901
        m, uid, uid_list = load(result[0])
902
        message_list = [m]
903 904
        uid_to_duplicate_uid_list_dict[uid] = uid_list
        group_method_id = m.line.group_method_id
905
        if group_method_id[0] != '\0':
906
          # Count the number of objects to prevent too many objects.
907
          cost = m.getGroupMethodCost()
908
          assert 0 < cost <= 1, (self.sql_table, uid)
909
          count = m.getObjectCount(activity_tool)
910 911 912 913 914
          # this is heuristic (messages with same group_method_id
          # are likely to have the same group_method_cost)
          limit = int(1. / cost + 1 - count)
          if limit > 1: # <=> cost * count < 1
            cost *= count
915
            # Retrieve objects which have the same group method.
916
            result = iter(already_assigned
917 918 919 920
              and Results(db.query(b"SELECT * FROM %s"
                b" WHERE processing_node=%d AND group_method_id=%s"
                b" ORDER BY priority, date LIMIT %d" % (
                str2bytes(self.sql_table), processing_node,
921 922 923 924 925
                db.string_literal(group_method_id), limit), 0))
                # Do not optimize rare case: keep the code simple by not
                # adding more results from getReservedMessageList if the
                # limit is not reached.
              or self.getReservedMessageList(db, now_date, processing_node,
926
                limit, group_method_id, node_family_id_list))
927
            for line in result:
928 929 930 931 932
              if line.uid in uid_to_duplicate_uid_list_dict:
                continue
              m, uid, uid_list = load(line)
              if m is None:
                uid_to_duplicate_uid_list_dict[uid] += uid_list
933
                continue
934
              uid_to_duplicate_uid_list_dict[uid] = uid_list
935
              cost += m.getObjectCount(activity_tool) * \
936
                      m.getGroupMethodCost()
937 938 939
              message_list.append(m)
              if cost >= 1:
                # Unreserve extra messages as soon as possible.
940 941
                uid_list = [line.uid for line in result if line.uid != uid]
                if uid_list:
942
                  self.assignMessageList(db, 0, uid_list)
943
        return message_list, group_method_id, uid_to_duplicate_uid_list_dict
944
    except:
945
      self._log(WARNING, 'Exception while reserving messages.')
946
      if uid_to_duplicate_uid_list_dict:
947 948
        to_free_uid_list = ensure_list(uid_to_duplicate_uid_list_dict.keys())
        for uid_list in six.itervalues(uid_to_duplicate_uid_list_dict):
949
          to_free_uid_list += uid_list
950
        try:
951
          self.assignMessageList(db, 0, to_free_uid_list)
952
        except:
953
          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
954
        else:
955
          if to_free_uid_list:
956
            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
957
      else:
958
        self._log(TRACE, '(no message was reserved)')
959
    return (), None, None
960

961 962 963 964 965 966 967 968 969
  def _abort(self):
    try:
      transaction.abort()
    except:
      # Unfortunately, database adapters may raise an exception against abort.
      self._log(PANIC,
          'abort failed, thus some objects may be modified accidentally')
      raise

970
  # Queue semantic
971 972
  def dequeueMessage(self, activity_tool, processing_node,
                     node_family_id_list):
973
    message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
974 975
      self.getProcessableMessageList(activity_tool, processing_node,
        node_family_id_list)
976 977
    if message_list:
      # Remove group_id parameter from group_method_id
978 979 980
      group_method_id = group_method_id.split('\0')[0]
      if group_method_id != "":
        method = activity_tool.invokeGroup
981
        args = (group_method_id, message_list, self.__class__.__name__,
982
                hasattr(self, 'generateMessageUID'))
983 984 985 986
        activity_runtime_environment = ActivityRuntimeEnvironment(
          None,
          priority=min(x.line.priority for x in message_list),
        )
987 988
      else:
        method = activity_tool.invoke
989 990
        message, = message_list
        args = message_list
991 992 993 994 995 996 997 998 999
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
      # Commit right before executing messages.
      # As MySQL transaction does not start exactly at the same time as ZODB
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
      # So all connectors must be committed now that we have selected
      # everything needed from MySQL to get a fresh view of ZODB objects.
      transaction.commit()
1000
      transaction.begin()
1001 1002
      # Try to invoke
      try:
1003 1004 1005 1006 1007
        # Refer Timeout.activity_timeout instead of
        #   from Products.ERP5Type.Timeout import activity_timeout
        # so that we can override the value in Timeout namescope in unit tests.
        offset = Timeout.activity_timeout
        with activity_runtime_environment, Deadline(offset):
1008
          method(*args)
1009 1010 1011 1012 1013
        # Abort if at least 1 message failed. On next tic, only those that
        # succeeded will be selected because their at_date won't have been
        # increased.
        for m in message_list:
          if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
1014
            raise _DequeueMessageException()
1015
        transaction.commit()
1016
      except:
1017
        exc_info = sys.exc_info()
1018
        if not isinstance(exc_info[1], _DequeueMessageException):
1019 1020 1021 1022 1023 1024 1025 1026
          self._log(WARNING,
            'Exception raised when invoking messages (uid, path, method_id) %r'
            % [(m.uid, m.object_path, m.method_id) for m in message_list])
          for m in message_list:
            m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
        self._abort()
        exc_info = message_list[0].exc_info
        if exc_info:
1027
          try:
1028
            # Register it again.
1029 1030
            with activity_runtime_environment:
              cancel = message.on_error_callback(*exc_info)
1031 1032 1033 1034
            del exc_info, message.exc_info
            transaction.commit()
            if cancel:
              message.setExecutionState(MESSAGE_EXECUTED)
1035
          except:
1036 1037 1038
            self._log(WARNING, 'Exception raised when processing error callbacks')
            message.setExecutionState(MESSAGE_NOT_EXECUTED)
            self._abort()
1039 1040
      self.finalizeMessageExecution(activity_tool, message_list,
                                    uid_to_duplicate_uid_list_dict)
1041
    transaction.commit()
1042
    return bool(message_list)
1043

1044
  def deleteMessageList(self, db, uid_list):
1045 1046
    db.query(str2bytes("DELETE FROM %s WHERE uid IN (%s)" % (
      self.sql_table, ','.join(map(str, uid_list)))))
1047 1048

  def reactivateMessageList(self, db, uid_list, delay, retry):
1049
    db.query(str2bytes("UPDATE %s SET"
1050 1051 1052
      " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
      "%s WHERE uid IN (%s)" % (
        self.sql_table, delay,
1053
        ", retry = retry + 1" if retry else "",
1054
        ",".join(map(str, uid_list)))))
1055

1056 1057 1058 1059 1060 1061 1062
  def finalizeMessageExecution(self, activity_tool, message_list,
                               uid_to_duplicate_uid_list_dict=None):
    """
      If everything was fine, delete all messages.
      If anything failed, make successful messages available (if any), and
      the following rules apply to failed messages:
        - Failures due to ConflictErrors cause messages to be postponed,
1063 1064
          but their retry count is *not* increased.
        - Failures of messages already above maximum retry count cause them to
1065
          be put in a permanent-error state.
1066
        - In all other cases, retry count is increased and message is delayed.
1067
    """
1068
    db = activity_tool.getSQLConnection()
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
    deletable_uid_list = []
    delay_uid_list = []
    final_error_uid_list = []
    make_available_uid_list = []
    notify_user_list = []
    executed_uid_list = deletable_uid_list
    if uid_to_duplicate_uid_list_dict is not None:
      for m in message_list:
        if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
          executed_uid_list = make_available_uid_list
          break
    for m in message_list:
      uid = m.uid
      if m.getExecutionState() == MESSAGE_EXECUTED:
        executed_uid_list.append(uid)
        if uid_to_duplicate_uid_list_dict is not None:
          executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
        # Should duplicate messages follow strictly the original message, or
        # should they be just made available again ?
        if uid_to_duplicate_uid_list_dict is not None:
          make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
1091
        if (m.exc_type and # m.exc_type may be None
1092 1093
            (m.conflict_retry if issubclass(m.exc_type, ConflictError) else
             m.exc_type is SkippedMessage)):
1094 1095
          delay_uid_list.append(uid)
        else:
1096
          max_retry = m.max_retry
1097
          retry = m.line.retry
1098
          if max_retry is not None and retry >= max_retry:
1099
            # Always notify when we stop retrying.
1100
            notify_user_list.append((m, False))
1101 1102
            final_error_uid_list.append(uid)
            continue
1103 1104
          # In case of infinite retry, notify the user
          # when the default limit is reached.
1105
          if max_retry is None and retry == DEFAULT_MAX_RETRY:
1106
            notify_user_list.append((m, True))
1107 1108 1109
          delay = m.delay
          if delay is None:
            # By default, make delay quadratic to the number of retries.
1110
            delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2
1111 1112
          try:
            # Immediately update, because values different for every message
1113
            self.reactivateMessageList(db, (uid,), delay, True)
1114
          except:
1115 1116
            self._log(WARNING, 'Failed to reactivate %r' % uid)
        make_available_uid_list.append(uid)
1117 1118 1119 1120 1121 1122 1123 1124
      else: # MESSAGE_NOT_EXECUTABLE
        # 'path' does not point to any object. Activities are normally flushed
        # (without invoking them) when an object is deleted, but this is only
        # an optimisation. There is no efficient and reliable way to do such
        # this, because a concurrent and very long transaction may be about to
        # activate this object, without conflict.
        # So we have to clean up any remaining activity.
        deletable_uid_list.append(uid)
1125 1126
    if deletable_uid_list:
      try:
1127
        self._retryOnLockError(self.deleteMessageList, (db, deletable_uid_list))
1128 1129 1130 1131 1132 1133
      except:
        self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
      else:
        self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
    if delay_uid_list:
      try:
1134
        # If this is a conflict error, do not increase 'retry' but only delay.
1135 1136
        self.reactivateMessageList(db, delay_uid_list,
                                   VALIDATION_ERROR_DELAY, False)
1137 1138 1139 1140
      except:
        self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
    if final_error_uid_list:
      try:
1141
        self.assignMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
1142 1143 1144 1145 1146
      except:
        self._log(ERROR, 'Failed to set message to error state for %r'
                         % final_error_uid_list)
    if make_available_uid_list:
      try:
1147
        self.assignMessageList(db, 0, make_available_uid_list)
1148 1149 1150 1151 1152
      except:
        self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
      else:
        self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
    try:
1153 1154
      for m, retry in notify_user_list:
        m.notifyUser(activity_tool, retry)
1155 1156 1157 1158
    except:
      # Notification failures must not cause this method to raise.
      self._log(WARNING,
        'Exception during notification phase of finalizeMessageExecution')
1159

1160
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, only_safe=False, **kw):
1161 1162 1163
    """
      object_path is a tuple
    """
1164
    db = activity_tool.getSQLConnection()
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
    path = '/'.join(object_path)
    if invoke:
      invoked = set()
      def invoke(message):
        try:
          key = self.generateMessageUID(message)
          if key in invoked:
            return
          invoked.add(key)
        except AttributeError:
          pass
        line = getattr(message, 'line', None)
1177
        if (line and line.processing_node != -1 or
1178
            self._getExecutableMessageSet(activity_tool, db, [message])):
1179
          # Try to invoke the message - what happens if invoke calls flushActivity ??
1180 1181
          with ActivityRuntimeEnvironment(message):
            activity_tool.invoke(message)
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
          if message.getExecutionState() != MESSAGE_EXECUTED:
            raise ActivityFlushError('Could not invoke %s on %s'
                                     % (message.method_id, path))
        else:
          raise ActivityFlushError('Could not validate %s on %s'
                                   % (message.method_id, path))
    for m in activity_tool.getRegisteredMessageList(self):
      if object_path == m.object_path and (
         method_id is None or method_id == m.method_id):
        if invoke:
          invoke(m)
        activity_tool.unregisterMessage(self, m)
    uid_list = []
1195
    for line in self._getMessageList(db, path=path,
1196
        **({'method_id': method_id} if method_id else {})):
1197 1198
      if only_safe and line.processing_node > -2:
        continue
1199
      uid_list.append(line.uid)
1200
      if invoke and line.processing_node <= 0:
1201
        invoke(Message.load(line.message, uid=line.uid, line=line))
1202
    if uid_list:
1203
      self.deleteMessageList(db, uid_list)
1204 1205 1206 1207 1208 1209 1210

  # Required for tests
  def timeShift(self, activity_tool, delay, processing_node=None):
    """
      To simulate time shift, we simply substract delay from
      all dates in message(_queue) table
    """
1211
    activity_tool.getSQLConnection().query(("UPDATE %s SET"
1212 1213
      " date = DATE_SUB(date, INTERVAL %s SECOND)"
      % (self.sql_table, delay)
1214
      + ('' if processing_node is None else
1215
         "WHERE processing_node=%s" % processing_node)).encode())