SQLBase.py 45.3 KB
Newer Older
1
from __future__ import absolute_import
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
#                    Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

30
from collections import defaultdict
31
from contextlib import contextmanager
32
from itertools import product
33
import operator
34
import sys
35
import transaction
36 37 38
from random import getrandbits
import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
39 40
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
Julien Muchembled's avatar
Julien Muchembled committed
41
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
42
from ZODB.POSException import ConflictError
43
from Products.CMFActivity.ActivityTool import (
44
  Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
45
from Products.CMFActivity.ActivityRuntimeEnvironment import (
46
  DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment)
47
from .Queue import Queue, VALIDATION_ERROR_DELAY
48
from Products.CMFActivity.Errors import ActivityFlushError
49 50
from Products.ERP5Type import Timeout
from Products.ERP5Type.Timeout import TimeoutReachedError, Deadline
51

52 53 54 55
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
56
INVOKE_ERROR_STATE = -2
57
DEPENDENCY_IGNORED_ERROR_STATE = -10
58 59 60 61 62 63 64 65 66 67 68 69 70
# Activity uids are stored as 64 bits unsigned integers.
# No need to depend on a database that supports unsigned integers.
# Numbers are far big enough without using the MSb. Assuming a busy activity
# table having one million activities, the probability of triggering a conflict
# when inserting one activity with 64 bits uid is 0.5e-13. With 63 bits it
# increases to 1e-13, which is still very low.
UID_SAFE_BITSIZE = 63
# Inserting an activity batch of 100 activities among one million existing
# activities has a probability of failing of 1e-11. While it should be low
# enough, retries can help lower that. Try 10 times, which should be short
# enough while yielding one order of magnitude collision probability
# improvement.
UID_ALLOCATION_TRY_COUNT = 10
71

72
def sort_message_key(message):
73
  # same sort key as in SQLBase.getMessageList
74 75
  return message.line.priority, message.line.date, message.uid

76
_DequeueMessageException = Exception()
77

78 79 80
_ITEMGETTER0 = operator.itemgetter(0)
_IDENTITY = lambda x: x

81 82 83
def render_datetime(x):
  return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]

84 85 86
_SQLTEST_NO_QUOTE_TYPE_SET = int, float, long
_SQLTEST_NON_SEQUENCE_TYPE_SET = _SQLTEST_NO_QUOTE_TYPE_SET + (DateTime, basestring)

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
@contextmanager
def SQLLock(db, lock_name, timeout):
  """
  Attemp to acquire a named SQL lock. The outcome of this acquisition is
  returned to the context statement and MUST be checked:
  1: lock acquired
  0: timeout
  """
  lock_name = db.string_literal(lock_name)
  query = db.query
  (_, ((acquired, ), )) = query('SELECT GET_LOCK(%s, %f)' % (lock_name, timeout))
  if acquired is None:
    raise ValueError('Error acquiring lock')
  try:
    yield acquired
  finally:
    if acquired:
      query('SELECT RELEASE_LOCK(%s)' % (lock_name, ))
105 106 107 108 109 110 111 112 113
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
  sqltest_dict = {}
  def _(name, column=None, op="="):
    if column is None:
      column = name
    column_op = "%s %s " % (column, op)
    def render(value, render_string):
114
      if isinstance(value, _SQLTEST_NO_QUOTE_TYPE_SET):
115 116
        return column_op + str(value)
      if isinstance(value, DateTime):
117
        value = render_datetime(value)
118 119 120 121 122 123
      if isinstance(value, basestring):
        return column_op + render_string(value)
      assert op == "=", value
      if value is None: # XXX: see comment in SQLBase._getMessageList
        return column + " IS NULL"
      for x in value:
124
        return "%s IN (%s)" % (column, ', '.join(map(
125
          str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else
126 127
          render_datetime if isinstance(x, DateTime) else
          render_string, value)))
128
      return "0"
129 130 131 132 133 134 135 136
    sqltest_dict[name] = render
  _('active_process_uid')
  _('group_method_id')
  _('method_id')
  _('path')
  _('processing_node')
  _('serialization_tag')
  _('tag')
137
  _('retry')
138 139
  _('to_date', column="date", op="<=")
  _('uid')
140 141 142 143 144
  def renderAbovePriorityDateUid(value, render_string):
    # Strictly dependent on _getMessageList's sort order: given a well-ordered
    # list of values, rendered condition will match the immediate next row in
    # that sort order.
    priority, date, uid = value
145 146
    assert isinstance(priority, _SQLTEST_NO_QUOTE_TYPE_SET)
    assert isinstance(uid, _SQLTEST_NO_QUOTE_TYPE_SET)
147 148 149 150 151 152 153 154 155 156 157
    return (
        '(priority>%(priority)s OR (priority=%(priority)s AND '
          '(date>%(date)s OR (date=%(date)s AND uid>%(uid)s))'
        '))' % {
        'priority': priority,
        # render_datetime raises if "date" lacks date API, so no need to check
        'date': render_string(render_datetime(date)),
        'uid': uid,
      }
    )
  sqltest_dict['above_priority_date_uid'] = renderAbovePriorityDateUid
158 159 160
  return sqltest_dict
sqltest_dict = sqltest_dict()

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
def _validate_after_path_and_method_id(value, render_string):
  path, method_id = value
  return (
    sqltest_dict['method_id'](method_id, render_string) +
    ' AND ' +
    sqltest_dict['path'](path, render_string)
  )

def _validate_after_tag_and_method_id(value, render_string):
  tag, method_id = value
  return (
    sqltest_dict['method_id'](method_id, render_string) +
    ' AND ' +
    sqltest_dict['tag'](tag, render_string)
  )

# Definition of activity dependencies
# key: dependency name (as passed to ActiveObject.activate() & friends)
# value:
# - tuple of column names. If there is more than one, they must be in the
#   same order as the dependency value items expected by the next item
# - callable rendering given values into an SQL condition
#   (value, render_string) -> str
_DEPENDENCY_TESTER_DICT = {
  'after_method_id': (
    ('method_id', ),
    sqltest_dict['method_id'],
  ),
  'after_path': (
    ('path', ),
    sqltest_dict['path'],
  ),
  'after_message_uid': (
    ('uid', ),
    sqltest_dict['uid'],
  ),
  'after_path_and_method_id': (
    ('path', 'method_id'),
    _validate_after_path_and_method_id,
  ),
  'after_tag': (
    ('tag', ),
    sqltest_dict['tag'],
  ),
  'after_tag_and_method_id': (
    ('tag', 'method_id'),
    _validate_after_tag_and_method_id,
  ),
  'serialization_tag': (
    ('serialization_tag', ),
    lambda value, render_string: (
      'processing_node > -1 AND ' +
      sqltest_dict['serialization_tag'](value, render_string)
    ),
  ),
}

218 219 220 221 222 223 224 225
def getNow(db):
  """
    Return the UTC date from the point of view of the SQL server.
    Note that this value is not cached, and is not transactionnal on MySQL
    side.
  """
  return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]

226
class SQLBase(Queue):
227 228 229
  """
    Define a set of common methods for SQL-based storage of activities.
  """
230 231 232 233 234 235
  def createTableSQL(self):
    return """\
CREATE TABLE %s (
  `uid` BIGINT UNSIGNED NOT NULL,
  `date` DATETIME(6) NOT NULL,
  `path` VARCHAR(255) NOT NULL,
236
  `active_process_uid` BIGINT UNSIGNED NULL,
237 238 239
  `method_id` VARCHAR(255) NOT NULL,
  `processing_node` SMALLINT NOT NULL DEFAULT -1,
  `priority` TINYINT NOT NULL DEFAULT 0,
240
  `node` SMALLINT NOT NULL DEFAULT 0,
241 242 243 244 245 246 247
  `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
  `tag` VARCHAR(255) NOT NULL,
  `serialization_tag` VARCHAR(255) NOT NULL,
  `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
  `message` LONGBLOB NOT NULL,
  PRIMARY KEY (`uid`),
  KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
248
  KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`),
249
  KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
250
  KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
251
  KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
252
  KEY (`path`, `processing_node`),
253
  KEY (`active_process_uid`),
254 255
  KEY (`method_id`, `processing_node`),
  KEY (`tag`, `processing_node`)
256
) ENGINE=InnoDB""" % self.sql_table
257

258
  def initialize(self, activity_tool, clear):
259 260
    db = activity_tool.getSQLConnection()
    create = self.createTableSQL()
261
    if clear:
262 263
      db.query("DROP TABLE IF EXISTS " + self.sql_table)
      db.query(create)
264
    else:
265 266
      src = db.upgradeSchema(create, create_if_not_exists=1,
                                     initialize=self._initialize)
267 268 269
      if src:
        LOG('CMFActivity', INFO, "%r table upgraded\n%s"
            % (self.sql_table, src))
270 271 272
    self._insert_max_payload = (db.getMaxAllowedPacket()
      + len(self._insert_separator)
      - len(self._insert_template % (self.sql_table, '')))
273 274 275 276 277

  def _initialize(self, db, column_list):
      LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
          " The following added columns could not be initialized: %s"
          % (self.sql_table, ", ".join(column_list)))
278

279 280
  _insert_template = ("INSERT INTO %s (uid,"
    " path, active_process_uid, date, method_id, processing_node,"
281
    " priority, node, group_method_id, tag, serialization_tag,"
282 283 284
    " message) VALUES\n(%s)")
  _insert_separator = "),\n("

285
  def _hasDependency(self, message):
286 287 288
    get = message.activity_kw.get
    return any(
      get(x) is not None
289
      for x in _DEPENDENCY_TESTER_DICT
290 291
    )

292
  def prepareQueueMessageList(self, activity_tool, message_list):
293 294 295
    db = activity_tool.getSQLConnection()
    quote = db.string_literal
    def insert(reset_uid):
296
      values = self._insert_separator.join(values_list)
297
      del values_list[:]
298
      for _ in xrange(UID_ALLOCATION_TRY_COUNT):
299 300 301 302
        if reset_uid:
          reset_uid = False
          # Overflow will result into IntegrityError.
          db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
303
        try:
304
          db.query(self._insert_template % (self.sql_table, values))
305 306 307
        except MySQLdb.IntegrityError, (code, _):
          if code != DUP_ENTRY:
            raise
308
          reset_uid = True
309 310 311
        else:
          break
      else:
312 313 314 315
        raise RuntimeError("Maximum retry for prepareQueueMessageList reached")
    i = 0
    reset_uid = True
    values_list = []
316 317
    max_payload = self._insert_max_payload
    sep_len = len(self._insert_separator)
318
    hasDependency = self._hasDependency
319 320 321 322
    for m in message_list:
      if m.is_registered:
        active_process_uid = m.active_process_uid
        date = m.activity_kw.get('at_date')
323
        row = ','.join((
324 325 326 327 328
          '@uid+%s' % i,
          quote('/'.join(m.object_path)),
          'NULL' if active_process_uid is None else str(active_process_uid),
          "UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
          quote(m.method_id),
329
          '-1' if hasDependency(m) else '0',
330
          str(m.activity_kw.get('priority', 1)),
331
          str(m.activity_kw.get('node', 0)),
332 333 334
          quote(m.getGroupId()),
          quote(m.activity_kw.get('tag', '')),
          quote(m.activity_kw.get('serialization_tag', '')),
335
          quote(Message.dump(m))))
336
        i += 1
337 338 339 340 341 342 343 344 345 346
        n = sep_len + len(row)
        max_payload -= n
        if max_payload < 0:
          if values_list:
            insert(reset_uid)
            reset_uid = False
            max_payload = self._insert_max_payload - n
          else:
            raise ValueError("max_allowed_packet too small to insert message")
        values_list.append(row)
347 348
    if values_list:
      insert(reset_uid)
349

350
  def _getMessageList(self, db, count=1000, src__=0, **kw):
351 352 353
    # XXX: Because most columns have NOT NULL constraint, conditions with None
    #      value should be ignored, instead of trying to render them
    #      (with comparisons with NULL).
354
    q = db.string_literal
355 356
    sql = '\n  AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
    sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
357 358 359 360
      self.sql_table,
      sql and '\nWHERE ' + sql,
      '' if count is None else '\nLIMIT %d' % count,
    )
361
    return sql if src__ else Results(db.query(sql, max_rows=0))
362

363 364
  def getMessageList(self, activity_tool, *args, **kw):
    result = self._getMessageList(activity_tool.getSQLConnection(), *args, **kw)
365 366
    if type(result) is str: # src__ == 1
      return result,
367
    class_name = self.__class__.__name__
368
    return [Message.load(line.message,
369
                             activity=class_name,
370 371
                             uid=line.uid,
                             processing_node=line.processing_node,
372
                             retry=line.retry)
373
      for line in result]
374

375
  def countMessageSQL(self, quote, **kw):
376 377
    return "SELECT count(*) FROM %s WHERE processing_node > %d AND %s" % (
      self.sql_table, DEPENDENCY_IGNORED_ERROR_STATE, " AND ".join(
378 379 380 381 382 383
        sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v
        ) or "1")

  def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw):
    where = [sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v]
    if only_valid:
384
      where.append('processing_node > %d' % INVOKE_ERROR_STATE)
385
    if only_invalid:
386
      where.append('processing_node <= %d' % INVOKE_ERROR_STATE)
387 388
    return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
      self.sql_table, " AND ".join(where) or "1")
389

390 391
  def getPriority(self, activity_tool, processing_node, node_set=None):
    if node_set is None:
392 393 394 395 396 397 398
      q = ("SELECT 3*priority, date FROM %s"
        " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
        " ORDER BY priority, date LIMIT 1" % self.sql_table)
    else:
      subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
        " WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
        " ORDER BY priority, date LIMIT 1)" % self.sql_table).format
399 400 401 402
      node = 'node=%s' % processing_node
      # "ALL" on all but one, to incur deduplication cost only once.
      # "UNION ALL" between the two naturally distinct sets.
      q = ("SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
403 404 405
        " ORDER BY effective_priority, date LIMIT 1" % (
          subquery(-1, node),
          subquery('', 'node=0'),
406 407
          subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
          ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
408 409 410 411
        ))
    result = activity_tool.getSQLConnection().query(q, 0)[1]
    if result:
      return result[0]
412
    return Queue.getPriority(self, activity_tool, processing_node, node_set)
413

414
  def _retryOnLockError(self, method, args=(), kw={}):
415 416
    while True:
      try:
417
        return method(*args, **kw)
418 419 420 421
      except ConflictError:
        # Note that this code assumes that a database adapter translates
        # a lock error into a conflict error.
        LOG('SQLBase', INFO, 'Got a lock error, retrying...')
422

423 424
  def _log(self, severity, summary):
    LOG(self.__class__.__name__, severity, summary,
425
        error=severity > INFO)
426

427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468
  def _getExecutableMessageSet(self, activity_tool, db, message_list):
    """
    Return, from given message list, the set of messages which have all their
    dependencies satisfied.
    """
    # Principle of operation:
    # For each dependency type used in given message list, find all messages
    # matching any of the dependency values used in given message list.
    # This provides the SQL database with structurally simple queries that it
    # should be able to optimise easily.
    # Further refinements:
    # - Any blocked message is ignored in further dendency type lookups (we
    #   already know it is blocked, no point in checking further).
    # - Test the most popular dependency types first, with the expectation
    #   that these will find most of the blockers, reducing the set of
    #   activities left to test and (with the refinement above) reducing the
    #   probability of having to run further queries (if there are other
    #   dependency types to test)
    dependency_tester_dict = _DEPENDENCY_TESTER_DICT
    # dependency_name (str): Something like 'serialization_tag', etc
    # dependency_value (any): dependency_name-dependent structure and meaning.
    # dependency_dict: define the dependencies to check, and which messages are
    # blocked by each found blocker.
    #   [dependency_name][dependency_value] -> message set
    dependency_dict = defaultdict(lambda: defaultdict(set))
    # message_dependency_dict: define which message has which dependencies, to
    # efficiently remove a message from dependency_dict once it is found to be
    # blocked.
    #   [message][dependency_name] -> dependency_value
    message_dependency_dict = defaultdict(dict)
    for message in message_list:
      for (
        dependency_name,
        dependency_value,
      ) in message.activity_kw.iteritems():
        try:
          column_list, _ = dependency_tester_dict[dependency_name]
        except KeyError:
          continue
        # There are 2 types of dependencies:
        # - monovalued (most), which accepts a single value and a vector of
        #   values.
469 470
        # - n-valued (after_path_and_method_id and after_tag_and_method_id)
        #   which accept a n-vector, each item being a single value or a vector
471
        #   of values.
472 473 474
        # Convert every form into its vector equivalent form, ignoring
        # conditions which cannot match any activity, and (for n-valued)
        # enumerate all possible combinations for later reverse-lookup.
475 476
        column_count = len(column_list)
        if column_count == 1:
477 478
          if dependency_value is None:
            continue
479 480 481 482
          dependency_value_list = [
            x
            for x in (
              (dependency_value, )
483 484 485 486 487
              if isinstance(
                dependency_value,
                _SQLTEST_NON_SEQUENCE_TYPE_SET,
              ) else
              dependency_value
488
            )
489
            # None values cannot match any activity, ignore them.
490 491 492
            if x is not None
          ]
        else:
493 494 495 496 497 498 499 500 501 502
          try:
            if (
              len(dependency_value) != column_count or
              None in dependency_value
            ):
              # Malformed or impossible to match dependency, ignore.
              continue
          except TypeError:
            # Malformed dependency, ignore.
            continue
503 504
          # Note: it any resulting item ends up empty (ex: it only contained
          # None), product will return an empty list.
505 506
          dependency_value_list = list(product(*(
            (
507
              (dependency_column_value, )
508 509 510 511
              if isinstance(
                dependency_column_value,
                _SQLTEST_NON_SEQUENCE_TYPE_SET,
              ) else
512
              (x for x in dependency_column_value if x is not None)
513
            )
514
            for dependency_column_value in dependency_value
515 516 517 518 519 520 521 522 523 524 525 526 527 528
          )))
        if not dependency_value_list:
          continue
        message_dependency_dict[message][dependency_name] = dependency_value_list
        dependency_value_dict = dependency_dict[dependency_name]
        for dependency_value in dependency_value_list:
          dependency_value_dict[dependency_value].add(message)
    # Messages are supposed valid until blockage is found.
    result = set(message_list)
    # Messages for which a blockage is found, so removal of this message from
    # further dependency processing is delayed to the next iteration, to avoid
    # doing such work if there is no such further iteration.
    new_blocked_message_set = set()
    quote = db.string_literal
529
    table_name_list = activity_tool.getSQLTableNameSet()
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
    for (
      dependency_name,
      dependency_value_dict,
    ) in sorted(
      dependency_dict.iteritems(),
      # Test first the condition with the most values.
      # XXX: after_path=('foo', 'bar') counts as 2 points for after_path
      # despite being a single activity. Is there a fairer (while cheap) way ?
      key=lambda dependency_dict_item: sum(
        len(message_set)
        for message_set in dependency_dict_item[1].itervalues()
      ),
      reverse=True,
    ):
      # Previous iteration found blocked messages.
      # Find which activities, and remove their values from dependency_dict
      # so these activities are not tested in further queries (we already
      # know they are blocked).
      while new_blocked_message_set:
        blocked_message = new_blocked_message_set.pop()
        for (
          message_dependency_name,
          message_dependency_value_list,
        ) in message_dependency_dict[blocked_message].iteritems():
          message_dependency_value_dict = dependency_dict[message_dependency_name]
          if not message_dependency_value_dict:
            # This dependency was already dropped or evaluated, nothing to
            # cleanup here.
            continue
          for message_dependency_value in message_dependency_value_list:
            message_set = message_dependency_value_dict[message_dependency_value]
            message_set.remove(blocked_message)
            if not message_set:
              # No more message wait for this value for this dependency, drop
              # the entry.
              del message_dependency_value_dict[message_dependency_value]
          # Note: no point in editing dependency_dict if
          # message_dependency_value_dict is empty, the outer loop is working
          # on a copy.
      if not dependency_value_dict:
        # No more non-blocked message for this dependency, skip it.
        continue
      column_list, to_sql = dependency_tester_dict[dependency_name]
573 574 575 576
      row2key = (
        _ITEMGETTER0
        if len(column_list) == 1 else
        _IDENTITY
577
      )
578
      base_sql_suffix = ' WHERE processing_node > %i AND (%%s) LIMIT 1)' % (
579
        DEPENDENCY_IGNORED_ERROR_STATE,
580 581 582 583 584 585 586
      )
      sql_suffix_list = [
        base_sql_suffix % to_sql(dependency_value, quote)
        for dependency_value in dependency_value_dict
      ]
      base_sql_prefix = '(SELECT %s FROM ' % (
        ','.join(column_list),
587 588 589
      )
      for row in db.query(
        ' UNION '.join(
590
          base_sql_prefix + table_name + sql_suffix
591
          for table_name in table_name_list
592
          for sql_suffix in sql_suffix_list
593 594 595 596 597 598 599 600 601 602 603 604 605 606
        ),
        max_rows=0,
      )[1]:
        # Each row is a value which blocks some activities.
        dependent_message_set = dependency_value_dict[row2key(row)]
        # queue blocked messages for processing in the beginning of next
        # outermost iteration.
        new_blocked_message_set.update(dependent_message_set)
        # ...but update result immediately, in case there is no next
        # outermost iteration.
        result.difference_update(dependent_message_set)
      dependency_value_dict.clear()
    return result

607
  def distribute(self, activity_tool, node_count):
608 609
    db = activity_tool.getSQLConnection()
    now_date = getNow(db)
610 611 612 613 614
    where_kw = {
      'processing_node': -1,
      'to_date': now_date,
      'count': READ_MESSAGE_LIMIT,
    }
615 616
    validated_count = 0
    while 1:
617
      result = self._getMessageList(db, **where_kw)
618 619 620
      if not result:
        return
      transaction.commit()
621 622 623
      message_list = [Message.load(line.message, uid=line.uid, line=line)
                      for line in result]
      message_set = self._getExecutableMessageSet(activity_tool, db, message_list)
624
      transaction.commit()
625
      if message_set:
626 627
        distributable_uid_set = set()
        serialization_tag_dict = {}
628
        for message in message_set:
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
          serialization_tag = message.activity_kw.get('serialization_tag')
          if serialization_tag is None:
            distributable_uid_set.add(message.uid)
          else:
            serialization_tag_dict.setdefault(serialization_tag,
                                              []).append(message)
        for message_list in serialization_tag_dict.itervalues():
          # Sort list of messages to validate the message with highest score
          message_list.sort(key=sort_message_key)
          distributable_uid_set.add(message_list[0].uid)
          group_method_id = message_list[0].line.group_method_id
          if group_method_id == '\0':
            continue
          for message in message_list[1:]:
            if group_method_id == message.line.group_method_id:
              distributable_uid_set.add(message.uid)
        distributable_count = len(distributable_uid_set)
        if distributable_count:
647
          self.assignMessageList(db, 0, distributable_uid_set)
648 649 650
          validated_count += distributable_count
          if validated_count >= MAX_VALIDATED_LIMIT:
            return
651
      line = result[-1]
652
      where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid)
653

654
  def getReservedMessageList(self, db, date, processing_node, limit,
655
                             group_method_id=None, node_set=None):
656 657 658 659
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
660 661
        This number is not garanted to be reached, because of not enough
        messages being pending execution.
662
    """
663
    assert limit
664 665
    quote = db.string_literal
    query = db.query
666 667 668 669
    args = (self.sql_table, sqltest_dict['to_date'](date, quote),
            ' AND group_method_id=' + quote(group_method_id)
            if group_method_id else '' , limit)

670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
    # Note: Not all write accesses to our table are protected by this lock.
    # This lock is not here for data consistency reasons, but to avoid wasting
    # time on SQL deadlocks caused by the varied lock ordering chosen by the
    # database. These queries specifically seem to be extremely prone to such
    # deadlocks, so prevent them from attempting to run in parallel on a given
    # activity table.
    # If more accesses are found to cause a significant waste of time because
    # of deadlocks, then they should acquire such lock as well. But
    # preemptively applying such lock everywhere without checking the amount
    # of waste is unlikely to produce a net gain.
    # XXX: timeout may benefit from being tweaked, but one second seem like a
    # reasonable starting point.
    # XXX: locking could probably be skipped altogether on clusters with few
    # enough processing nodes, as there should be little deadlocks and the
    # tradeoff becomes unfavorable to explicit locks. What threshold to
    # choose ?
    with SQLLock(db, self.sql_table, timeout=1) as acquired:
      if not acquired:
        # This table is busy, check for work to do elsewhere
        return ()
      # Get reservable messages.
      # During normal operation, sorting by date (as last criteria) is fairer
      # for users and reduce the probability to do the same work several times
      # (think of an object that is modified several times in a short period of
      # time).
      if node_set is None:
        result = Results(query(
          "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
          " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
      else:
        # We'd like to write
        #   ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
        # but this makes indices inefficient.
        subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
          " WHERE {} AND processing_node=0 AND %s%s"
          " ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
        node = 'node=%s' % processing_node
        result = Results(query(
          # "ALL" on all but one, to incur deduplication cost only once.
          # "UNION ALL" between the two naturally distinct sets.
          "SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
          " ORDER BY effective_priority, date LIMIT %s"% (
              subquery(-1, node),
              subquery('', 'node=0'),
              subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
              ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
              limit), 0))
      if result:
        # Reserve messages.
        uid_list = [x.uid for x in result]
        self.assignMessageList(db, processing_node, uid_list)
        self._log(TRACE, 'Reserved messages: %r' % uid_list)
        return result
723 724 725
    return ()

  def assignMessageList(self, db, state, uid_list):
726
    """
727
      Put messages back in given processing_node.
728
    """
729 730
    db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
      self.sql_table, state, ','.join(map(str, uid_list))))
731

732
  def getProcessableMessageLoader(self, db, processing_node):
733 734 735
    # do not merge anything
    def load(line):
      uid = line.uid
736
      m = Message.load(line.message, uid=uid, line=line)
737 738 739
      return m, uid, ()
    return load

740 741
  def getProcessableMessageList(self, activity_tool, processing_node,
                                node_family_id_list):
742 743 744 745 746 747 748
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - if this message has a group_method_id:
749 750
        - reserve a bunch of messages
        - until the total "cost" of the group goes over 1
751 752
          - get one message from the reserved bunch (this messages will be
            "needed")
753
          - update the total cost
754 755 756 757 758 759 760 761 762 763 764 765 766 767
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
        4-tuple:
          - list of messages
          - group_method_id
          - uid_to_duplicate_uid_list_dict
    """
768 769
    db = activity_tool.getSQLConnection()
    now_date = getNow(db)
770
    uid_to_duplicate_uid_list_dict = {}
771
    try:
772 773 774 775 776 777 778 779 780 781 782 783 784 785
      while 1: # not a loop
        # Select messages that were either assigned manually or left
        # unprocessed after a shutdown. Most of the time, there's none.
        # To minimize the probability of deadlocks, we also COMMIT so that a
        # new transaction starts on the first 'FOR UPDATE' query, which is all
        # the more important as the current on started with getPriority().
        result = db.query("SELECT * FROM %s WHERE processing_node=%s"
          " ORDER BY priority, date LIMIT 1\0COMMIT" % (
          self.sql_table, processing_node), 0)
        already_assigned = result[1]
        if already_assigned:
          result = Results(result)
        else:
          result = self.getReservedMessageList(db, now_date, processing_node,
786
                                               1, node_set=node_family_id_list)
787 788
          if not result:
            break
789 790 791
          # So reserved documents are properly released even if load raises.
          for line in result:
            uid_to_duplicate_uid_list_dict[line.uid] = []
792
        load = self.getProcessableMessageLoader(db, processing_node)
793
        m, uid, uid_list = load(result[0])
794
        message_list = [m]
795 796
        uid_to_duplicate_uid_list_dict[uid] = uid_list
        group_method_id = m.line.group_method_id
797
        if group_method_id[0] != '\0':
798
          # Count the number of objects to prevent too many objects.
799
          cost = m.getGroupMethodCost()
800
          assert 0 < cost <= 1, (self.sql_table, uid)
801
          count = m.getObjectCount(activity_tool)
802 803 804 805 806
          # this is heuristic (messages with same group_method_id
          # are likely to have the same group_method_cost)
          limit = int(1. / cost + 1 - count)
          if limit > 1: # <=> cost * count < 1
            cost *= count
807
            # Retrieve objects which have the same group method.
808 809 810 811 812 813 814 815 816 817
            result = iter(already_assigned
              and Results(db.query("SELECT * FROM %s"
                " WHERE processing_node=%s AND group_method_id=%s"
                " ORDER BY priority, date LIMIT %s" % (
                self.sql_table, processing_node,
                db.string_literal(group_method_id), limit), 0))
                # Do not optimize rare case: keep the code simple by not
                # adding more results from getReservedMessageList if the
                # limit is not reached.
              or self.getReservedMessageList(db, now_date, processing_node,
818
                limit, group_method_id, node_family_id_list))
819
            for line in result:
820 821 822 823 824
              if line.uid in uid_to_duplicate_uid_list_dict:
                continue
              m, uid, uid_list = load(line)
              if m is None:
                uid_to_duplicate_uid_list_dict[uid] += uid_list
825
                continue
826
              uid_to_duplicate_uid_list_dict[uid] = uid_list
827
              cost += m.getObjectCount(activity_tool) * \
828
                      m.getGroupMethodCost()
829 830 831
              message_list.append(m)
              if cost >= 1:
                # Unreserve extra messages as soon as possible.
832 833
                uid_list = [line.uid for line in result if line.uid != uid]
                if uid_list:
834
                  self.assignMessageList(db, 0, uid_list)
835
        return message_list, group_method_id, uid_to_duplicate_uid_list_dict
836
    except:
837
      self._log(WARNING, 'Exception while reserving messages.')
838 839 840 841
      if uid_to_duplicate_uid_list_dict:
        to_free_uid_list = uid_to_duplicate_uid_list_dict.keys()
        for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
          to_free_uid_list += uid_list
842
        try:
843
          self.assignMessageList(db, 0, to_free_uid_list)
844
        except:
845
          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
846
        else:
847
          if to_free_uid_list:
848
            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
849
      else:
850
        self._log(TRACE, '(no message was reserved)')
851
    return (), None, None
852

853 854 855 856 857 858 859 860 861
  def _abort(self):
    try:
      transaction.abort()
    except:
      # Unfortunately, database adapters may raise an exception against abort.
      self._log(PANIC,
          'abort failed, thus some objects may be modified accidentally')
      raise

862
  # Queue semantic
863 864
  def dequeueMessage(self, activity_tool, processing_node,
                     node_family_id_list):
865
    message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
866 867
      self.getProcessableMessageList(activity_tool, processing_node,
        node_family_id_list)
868 869
    if message_list:
      # Remove group_id parameter from group_method_id
870 871 872
      group_method_id = group_method_id.split('\0')[0]
      if group_method_id != "":
        method = activity_tool.invokeGroup
873
        args = (group_method_id, message_list, self.__class__.__name__,
874
                hasattr(self, 'generateMessageUID'))
875 876 877 878
        activity_runtime_environment = ActivityRuntimeEnvironment(
          None,
          priority=min(x.line.priority for x in message_list),
        )
879 880
      else:
        method = activity_tool.invoke
881 882
        message, = message_list
        args = message_list
883 884 885 886 887 888 889 890 891
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
      # Commit right before executing messages.
      # As MySQL transaction does not start exactly at the same time as ZODB
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
      # So all connectors must be committed now that we have selected
      # everything needed from MySQL to get a fresh view of ZODB objects.
      transaction.commit()
892
      transaction.begin()
893 894
      # Try to invoke
      try:
895 896 897 898 899
        # Refer Timeout.activity_timeout instead of
        #   from Products.ERP5Type.Timeout import activity_timeout
        # so that we can override the value in Timeout namescope in unit tests.
        offset = Timeout.activity_timeout
        with activity_runtime_environment, Deadline(offset):
900
          method(*args)
901 902 903 904 905 906 907
        # Abort if at least 1 message failed. On next tic, only those that
        # succeeded will be selected because their at_date won't have been
        # increased.
        for m in message_list:
          if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
            raise _DequeueMessageException
        transaction.commit()
908
      except:
909 910 911 912 913 914 915 916 917 918
        exc_info = sys.exc_info()
        if exc_info[1] is not _DequeueMessageException:
          self._log(WARNING,
            'Exception raised when invoking messages (uid, path, method_id) %r'
            % [(m.uid, m.object_path, m.method_id) for m in message_list])
          for m in message_list:
            m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
        self._abort()
        exc_info = message_list[0].exc_info
        if exc_info:
919
          try:
920
            # Register it again.
921 922
            with activity_runtime_environment:
              cancel = message.on_error_callback(*exc_info)
923 924 925 926
            del exc_info, message.exc_info
            transaction.commit()
            if cancel:
              message.setExecutionState(MESSAGE_EXECUTED)
927
          except:
928 929 930
            self._log(WARNING, 'Exception raised when processing error callbacks')
            message.setExecutionState(MESSAGE_NOT_EXECUTED)
            self._abort()
931 932
      self.finalizeMessageExecution(activity_tool, message_list,
                                    uid_to_duplicate_uid_list_dict)
933 934 935
    transaction.commit()
    return not message_list

936 937 938 939 940 941 942 943 944 945 946 947
  def deleteMessageList(self, db, uid_list):
    db.query("DELETE FROM %s WHERE uid IN (%s)" % (
      self.sql_table, ','.join(map(str, uid_list))))

  def reactivateMessageList(self, db, uid_list, delay, retry):
    db.query("UPDATE %s SET"
      " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
      "%s WHERE uid IN (%s)" % (
        self.sql_table, delay,
        ", priority = priority + 1, retry = retry + 1" if retry else "",
        ",".join(map(str, uid_list))))

948 949 950 951 952 953 954
  def finalizeMessageExecution(self, activity_tool, message_list,
                               uid_to_duplicate_uid_list_dict=None):
    """
      If everything was fine, delete all messages.
      If anything failed, make successful messages available (if any), and
      the following rules apply to failed messages:
        - Failures due to ConflictErrors cause messages to be postponed,
955 956
          but their retry count is *not* increased.
        - Failures of messages already above maximum retry count cause them to
957
          be put in a permanent-error state.
958
        - In all other cases, retry count is increased and message is delayed.
959
    """
960
    db = activity_tool.getSQLConnection()
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982
    deletable_uid_list = []
    delay_uid_list = []
    final_error_uid_list = []
    make_available_uid_list = []
    notify_user_list = []
    executed_uid_list = deletable_uid_list
    if uid_to_duplicate_uid_list_dict is not None:
      for m in message_list:
        if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
          executed_uid_list = make_available_uid_list
          break
    for m in message_list:
      uid = m.uid
      if m.getExecutionState() == MESSAGE_EXECUTED:
        executed_uid_list.append(uid)
        if uid_to_duplicate_uid_list_dict is not None:
          executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
        # Should duplicate messages follow strictly the original message, or
        # should they be just made available again ?
        if uid_to_duplicate_uid_list_dict is not None:
          make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
983
        if (m.exc_type and # m.exc_type may be None
984 985
            (m.conflict_retry if issubclass(m.exc_type, ConflictError) else
             m.exc_type is SkippedMessage)):
986 987
          delay_uid_list.append(uid)
        else:
988
          max_retry = m.max_retry
989
          retry = m.line.retry
990 991
          if (max_retry is not None and retry >= max_retry) or \
              m.exc_type == TimeoutReachedError:
992
            # Always notify when we stop retrying.
993
            notify_user_list.append((m, False))
994 995
            final_error_uid_list.append(uid)
            continue
996 997
          # In case of infinite retry, notify the user
          # when the default limit is reached.
998
          if max_retry is None and retry == DEFAULT_MAX_RETRY:
999
            notify_user_list.append((m, True))
1000 1001 1002
          delay = m.delay
          if delay is None:
            # By default, make delay quadratic to the number of retries.
1003
            delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2
1004 1005
          try:
            # Immediately update, because values different for every message
1006
            self.reactivateMessageList(db, (uid,), delay, True)
1007
          except:
1008 1009
            self._log(WARNING, 'Failed to reactivate %r' % uid)
        make_available_uid_list.append(uid)
1010 1011 1012 1013 1014 1015 1016 1017
      else: # MESSAGE_NOT_EXECUTABLE
        # 'path' does not point to any object. Activities are normally flushed
        # (without invoking them) when an object is deleted, but this is only
        # an optimisation. There is no efficient and reliable way to do such
        # this, because a concurrent and very long transaction may be about to
        # activate this object, without conflict.
        # So we have to clean up any remaining activity.
        deletable_uid_list.append(uid)
1018 1019
    if deletable_uid_list:
      try:
1020
        self._retryOnLockError(self.deleteMessageList, (db, deletable_uid_list))
1021 1022 1023 1024 1025 1026
      except:
        self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
      else:
        self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
    if delay_uid_list:
      try:
1027
        # If this is a conflict error, do not increase 'retry' but only delay.
1028 1029
        self.reactivateMessageList(db, delay_uid_list,
                                   VALIDATION_ERROR_DELAY, False)
1030 1031 1032 1033
      except:
        self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
    if final_error_uid_list:
      try:
1034
        self.assignMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
1035 1036 1037 1038 1039
      except:
        self._log(ERROR, 'Failed to set message to error state for %r'
                         % final_error_uid_list)
    if make_available_uid_list:
      try:
1040
        self.assignMessageList(db, 0, make_available_uid_list)
1041 1042 1043 1044 1045
      except:
        self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
      else:
        self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
    try:
1046 1047
      for m, retry in notify_user_list:
        m.notifyUser(activity_tool, retry)
1048 1049 1050 1051
    except:
      # Notification failures must not cause this method to raise.
      self._log(WARNING,
        'Exception during notification phase of finalizeMessageExecution')
1052

1053
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, only_safe=False, **kw):
1054 1055 1056
    """
      object_path is a tuple
    """
1057
    db = activity_tool.getSQLConnection()
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
    path = '/'.join(object_path)
    if invoke:
      invoked = set()
      def invoke(message):
        try:
          key = self.generateMessageUID(message)
          if key in invoked:
            return
          invoked.add(key)
        except AttributeError:
          pass
        line = getattr(message, 'line', None)
1070
        if (line and line.processing_node != -1 or
1071
            self._getExecutableMessageSet(activity_tool, db, [message])):
1072
          # Try to invoke the message - what happens if invoke calls flushActivity ??
1073 1074
          with ActivityRuntimeEnvironment(message):
            activity_tool.invoke(message)
1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
          if message.getExecutionState() != MESSAGE_EXECUTED:
            raise ActivityFlushError('Could not invoke %s on %s'
                                     % (message.method_id, path))
        else:
          raise ActivityFlushError('Could not validate %s on %s'
                                   % (message.method_id, path))
    for m in activity_tool.getRegisteredMessageList(self):
      if object_path == m.object_path and (
         method_id is None or method_id == m.method_id):
        if invoke:
          invoke(m)
        activity_tool.unregisterMessage(self, m)
    uid_list = []
1088
    for line in self._getMessageList(db, path=path,
1089
        **({'method_id': method_id} if method_id else {})):
1090 1091
      if only_safe and line.processing_node > -2:
        continue
1092
      uid_list.append(line.uid)
1093
      if invoke and line.processing_node <= 0:
1094
        invoke(Message.load(line.message, uid=line.uid, line=line))
1095
    if uid_list:
1096
      self.deleteMessageList(db, uid_list)
1097 1098 1099 1100 1101 1102 1103

  # Required for tests
  def timeShift(self, activity_tool, delay, processing_node=None):
    """
      To simulate time shift, we simply substract delay from
      all dates in message(_queue) table
    """
1104
    activity_tool.getSQLConnection().query("UPDATE %s SET"
1105 1106
      " date = DATE_SUB(date, INTERVAL %s SECOND)"
      % (self.sql_table, delay)
1107 1108
      + ('' if processing_node is None else
         "WHERE processing_node=%s" % processing_node))