SQLBase.py 35.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
#                    Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import sys
30
import transaction
31 32 33
from random import getrandbits
import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
34 35
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
Julien Muchembled's avatar
Julien Muchembled committed
36
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
37
from ZODB.POSException import ConflictError
38
from Products.CMFActivity.ActivityTool import (
39
  Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
40
from Products.CMFActivity.ActivityRuntimeEnvironment import (
41
  DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment)
42
from Queue import Queue, VALIDATION_ERROR_DELAY
43
from Products.CMFActivity.Errors import ActivityFlushError
44

45 46 47 48
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
49
INVOKE_ERROR_STATE = -2
50 51 52 53 54 55 56 57 58 59 60 61 62
# Activity uids are stored as 64 bits unsigned integers.
# No need to depend on a database that supports unsigned integers.
# Numbers are far big enough without using the MSb. Assuming a busy activity
# table having one million activities, the probability of triggering a conflict
# when inserting one activity with 64 bits uid is 0.5e-13. With 63 bits it
# increases to 1e-13, which is still very low.
UID_SAFE_BITSIZE = 63
# Inserting an activity batch of 100 activities among one million existing
# activities has a probability of failing of 1e-11. While it should be low
# enough, retries can help lower that. Try 10 times, which should be short
# enough while yielding one order of magnitude collision probability
# improvement.
UID_ALLOCATION_TRY_COUNT = 10
63

64
def sort_message_key(message):
65
  # same sort key as in SQLBase.getMessageList
66 67
  return message.line.priority, message.line.date, message.uid

68
_DequeueMessageException = Exception()
69

70 71 72
def render_datetime(x):
  return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]

73 74 75 76 77 78 79 80 81 82 83 84 85
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
  sqltest_dict = {}
  no_quote_type = int, float, long
  def _(name, column=None, op="="):
    if column is None:
      column = name
    column_op = "%s %s " % (column, op)
    def render(value, render_string):
      if isinstance(value, no_quote_type):
        return column_op + str(value)
      if isinstance(value, DateTime):
86
        value = render_datetime(value)
87 88 89 90 91 92
      if isinstance(value, basestring):
        return column_op + render_string(value)
      assert op == "=", value
      if value is None: # XXX: see comment in SQLBase._getMessageList
        return column + " IS NULL"
      for x in value:
93 94 95 96
        return "%s IN (%s)" % (column, ', '.join(map(
          str if isinstance(x, no_quote_type) else
          render_datetime if isinstance(x, DateTime) else
          render_string, value)))
97
      return "0"
98 99 100 101 102 103 104 105
    sqltest_dict[name] = render
  _('active_process_uid')
  _('group_method_id')
  _('method_id')
  _('path')
  _('processing_node')
  _('serialization_tag')
  _('tag')
106
  _('retry')
107 108
  _('to_date', column="date", op="<=")
  _('uid')
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
  def renderAbovePriorityDateUid(value, render_string):
    # Strictly dependent on _getMessageList's sort order: given a well-ordered
    # list of values, rendered condition will match the immediate next row in
    # that sort order.
    priority, date, uid = value
    assert isinstance(priority, no_quote_type)
    assert isinstance(uid, no_quote_type)
    return (
        '(priority>%(priority)s OR (priority=%(priority)s AND '
          '(date>%(date)s OR (date=%(date)s AND uid>%(uid)s))'
        '))' % {
        'priority': priority,
        # render_datetime raises if "date" lacks date API, so no need to check
        'date': render_string(render_datetime(date)),
        'uid': uid,
      }
    )
  sqltest_dict['above_priority_date_uid'] = renderAbovePriorityDateUid
127 128 129
  return sqltest_dict
sqltest_dict = sqltest_dict()

130 131 132 133 134 135 136 137
def getNow(db):
  """
    Return the UTC date from the point of view of the SQL server.
    Note that this value is not cached, and is not transactionnal on MySQL
    side.
  """
  return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]

138
class SQLBase(Queue):
139 140 141
  """
    Define a set of common methods for SQL-based storage of activities.
  """
142 143 144 145 146 147 148 149 150 151
  def createTableSQL(self):
    return """\
CREATE TABLE %s (
  `uid` BIGINT UNSIGNED NOT NULL,
  `date` DATETIME(6) NOT NULL,
  `path` VARCHAR(255) NOT NULL,
  `active_process_uid` INT UNSIGNED NULL,
  `method_id` VARCHAR(255) NOT NULL,
  `processing_node` SMALLINT NOT NULL DEFAULT -1,
  `priority` TINYINT NOT NULL DEFAULT 0,
152
  `node` SMALLINT NOT NULL DEFAULT 0,
153 154 155 156 157 158 159
  `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
  `tag` VARCHAR(255) NOT NULL,
  `serialization_tag` VARCHAR(255) NOT NULL,
  `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
  `message` LONGBLOB NOT NULL,
  PRIMARY KEY (`uid`),
  KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
160
  KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`),
161
  KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
162
  KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
163 164 165 166 167 168
  KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
  KEY (`path`),
  KEY (`active_process_uid`),
  KEY (`method_id`),
  KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
169

170
  def initialize(self, activity_tool, clear):
171 172
    db = activity_tool.getSQLConnection()
    create = self.createTableSQL()
173
    if clear:
174 175
      db.query("DROP TABLE IF EXISTS " + self.sql_table)
      db.query(create)
176
    else:
177 178
      src = db.upgradeSchema(create, create_if_not_exists=1,
                                     initialize=self._initialize)
179 180 181
      if src:
        LOG('CMFActivity', INFO, "%r table upgraded\n%s"
            % (self.sql_table, src))
182 183 184
    self._insert_max_payload = (db.getMaxAllowedPacket()
      + len(self._insert_separator)
      - len(self._insert_template % (self.sql_table, '')))
185 186 187 188 189

  def _initialize(self, db, column_list):
      LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
          " The following added columns could not be initialized: %s"
          % (self.sql_table, ", ".join(column_list)))
190

191 192
  _insert_template = ("INSERT INTO %s (uid,"
    " path, active_process_uid, date, method_id, processing_node,"
193
    " priority, node, group_method_id, tag, serialization_tag,"
194 195 196
    " message) VALUES\n(%s)")
  _insert_separator = "),\n("

197
  def prepareQueueMessageList(self, activity_tool, message_list):
198 199 200
    db = activity_tool.getSQLConnection()
    quote = db.string_literal
    def insert(reset_uid):
201
      values = self._insert_separator.join(values_list)
202
      del values_list[:]
203
      for _ in xrange(UID_ALLOCATION_TRY_COUNT):
204 205 206 207
        if reset_uid:
          reset_uid = False
          # Overflow will result into IntegrityError.
          db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
208
        try:
209
          db.query(self._insert_template % (self.sql_table, values))
210 211 212
        except MySQLdb.IntegrityError, (code, _):
          if code != DUP_ENTRY:
            raise
213
          reset_uid = True
214 215 216
        else:
          break
      else:
217 218 219 220
        raise RuntimeError("Maximum retry for prepareQueueMessageList reached")
    i = 0
    reset_uid = True
    values_list = []
221 222
    max_payload = self._insert_max_payload
    sep_len = len(self._insert_separator)
223 224 225 226 227 228
    for m in message_list:
      if m.is_registered:
        active_process_uid = m.active_process_uid
        order_validation_text = m.order_validation_text = \
          self.getOrderValidationText(m)
        date = m.activity_kw.get('at_date')
229
        row = ','.join((
230 231 232 233 234 235 236
          '@uid+%s' % i,
          quote('/'.join(m.object_path)),
          'NULL' if active_process_uid is None else str(active_process_uid),
          "UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
          quote(m.method_id),
          '0' if order_validation_text == 'none' else '-1',
          str(m.activity_kw.get('priority', 1)),
237
          str(m.activity_kw.get('node', 0)),
238 239 240
          quote(m.getGroupId()),
          quote(m.activity_kw.get('tag', '')),
          quote(m.activity_kw.get('serialization_tag', '')),
241
          quote(Message.dump(m))))
242
        i += 1
243 244 245 246 247 248 249 250 251 252
        n = sep_len + len(row)
        max_payload -= n
        if max_payload < 0:
          if values_list:
            insert(reset_uid)
            reset_uid = False
            max_payload = self._insert_max_payload - n
          else:
            raise ValueError("max_allowed_packet too small to insert message")
        values_list.append(row)
253 254
    if values_list:
      insert(reset_uid)
255

256
  def _getMessageList(self, db, count=1000, src__=0, **kw):
257 258 259
    # XXX: Because most columns have NOT NULL constraint, conditions with None
    #      value should be ignored, instead of trying to render them
    #      (with comparisons with NULL).
260
    q = db.string_literal
261 262
    sql = '\n  AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
    sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
263 264 265 266
      self.sql_table,
      sql and '\nWHERE ' + sql,
      '' if count is None else '\nLIMIT %d' % count,
    )
267
    return sql if src__ else Results(db.query(sql, max_rows=0))
268

269 270
  def getMessageList(self, activity_tool, *args, **kw):
    result = self._getMessageList(activity_tool.getSQLConnection(), *args, **kw)
271 272
    if type(result) is str: # src__ == 1
      return result,
273
    class_name = self.__class__.__name__
274
    return [Message.load(line.message,
275
                             activity=class_name,
276 277
                             uid=line.uid,
                             processing_node=line.processing_node,
278
                             retry=line.retry)
279
      for line in result]
280

281 282 283 284 285 286 287 288 289 290 291 292 293 294
  def countMessageSQL(self, quote, **kw):
    return "SELECT count(*) FROM %s WHERE processing_node > -10 AND %s" % (
      self.sql_table, " AND ".join(
        sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v
        ) or "1")

  def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw):
    where = [sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v]
    if only_valid:
      where.append('processing_node > -2')
    if only_invalid:
      where.append('processing_node < -1')
    return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
      self.sql_table, " AND ".join(where) or "1")
295

296 297
  def getPriority(self, activity_tool, processing_node, node_set=None):
    if node_set is None:
298 299 300 301 302 303 304
      q = ("SELECT 3*priority, date FROM %s"
        " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
        " ORDER BY priority, date LIMIT 1" % self.sql_table)
    else:
      subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
        " WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
        " ORDER BY priority, date LIMIT 1)" % self.sql_table).format
305 306 307 308
      node = 'node=%s' % processing_node
      # "ALL" on all but one, to incur deduplication cost only once.
      # "UNION ALL" between the two naturally distinct sets.
      q = ("SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
309 310 311
        " ORDER BY effective_priority, date LIMIT 1" % (
          subquery(-1, node),
          subquery('', 'node=0'),
312 313
          subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
          ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
314 315 316 317
        ))
    result = activity_tool.getSQLConnection().query(q, 0)[1]
    if result:
      return result[0]
318
    return Queue.getPriority(self, activity_tool, processing_node, node_set)
319

320
  def _retryOnLockError(self, method, args=(), kw={}):
321 322
    while True:
      try:
323
        return method(*args, **kw)
324 325 326 327
      except ConflictError:
        # Note that this code assumes that a database adapter translates
        # a lock error into a conflict error.
        LOG('SQLBase', INFO, 'Got a lock error, retrying...')
328

329
  # Validation private methods
330
  def getValidationSQL(self, quote, activate_kw, same_queue):
331 332 333 334 335 336
    validate_list = []
    for k, v in activate_kw.iteritems():
      if v is not None:
        try:
          method = getattr(self, '_validate_' + k, None)
          if method:
337
            validate_list.append(' AND '.join(method(v, quote)))
338 339 340 341 342 343 344 345
        except Exception:
          LOG('CMFActivity', WARNING, 'invalid %s value: %r' % (k, v),
              error=True)
          # Prevent validation by depending on anything, at least itself.
          validate_list = '1',
          same_queue = False
          break
    if validate_list:
346 347 348 349 350 351
      return ("SELECT '%s' as activity, uid, date, processing_node,"
              " priority, group_method_id, message FROM %s"
              " WHERE processing_node > -10 AND (%s) LIMIT %s" % (
                type(self).__name__, self.sql_table,
                ' OR '.join(validate_list),
                READ_MESSAGE_LIMIT if same_queue else 1))
352

353 354
  def _validate_after_method_id(self, *args):
    return sqltest_dict['method_id'](*args),
355

356 357
  def _validate_after_path(self, *args):
    return sqltest_dict['path'](*args),
358

359 360
  def _validate_after_message_uid(self, *args):
    return sqltest_dict['uid'](*args),
361

362 363 364 365
  def _validate_after_path_and_method_id(self, value, quote):
    path, method_id = value
    return (sqltest_dict['method_id'](method_id, quote),
            sqltest_dict['path'](path, quote))
366

367 368
  def _validate_after_tag(self, *args):
    return sqltest_dict['tag'](*args),
369

370 371 372 373
  def _validate_after_tag_and_method_id(self, value, quote):
    tag, method_id = value
    return (sqltest_dict['method_id'](method_id, quote),
            sqltest_dict['tag'](tag, quote))
374

375 376
  def _validate_serialization_tag(self, *args):
    return 'processing_node > -1', sqltest_dict['serialization_tag'](*args)
377 378 379 380 381

  def _log(self, severity, summary):
    LOG(self.__class__.__name__, severity, summary,
        error=severity>INFO and sys.exc_info() or None)

382
  def distribute(self, activity_tool, node_count):
383 384
    db = activity_tool.getSQLConnection()
    now_date = getNow(db)
385 386 387 388 389
    where_kw = {
      'processing_node': -1,
      'to_date': now_date,
      'count': READ_MESSAGE_LIMIT,
    }
390 391
    validated_count = 0
    while 1:
392
      result = self._getMessageList(db, **where_kw)
393 394 395 396 397 398 399 400 401 402 403 404
      if not result:
        return
      transaction.commit()

      validation_text_dict = {'none': 1}
      message_dict = {}
      for line in result:
        message = Message.load(line.message, uid=line.uid, line=line)
        if not hasattr(message, 'order_validation_text'): # BBB
          message.order_validation_text = self.getOrderValidationText(message)
        self.getExecutableMessageList(activity_tool, message, message_dict,
                                      validation_text_dict, now_date=now_date)
405
      transaction.commit()
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
      if message_dict:
        distributable_uid_set = set()
        serialization_tag_dict = {}
        for message in message_dict.itervalues():
          serialization_tag = message.activity_kw.get('serialization_tag')
          if serialization_tag is None:
            distributable_uid_set.add(message.uid)
          else:
            serialization_tag_dict.setdefault(serialization_tag,
                                              []).append(message)
        for message_list in serialization_tag_dict.itervalues():
          # Sort list of messages to validate the message with highest score
          message_list.sort(key=sort_message_key)
          distributable_uid_set.add(message_list[0].uid)
          group_method_id = message_list[0].line.group_method_id
          if group_method_id == '\0':
            continue
          for message in message_list[1:]:
            if group_method_id == message.line.group_method_id:
              distributable_uid_set.add(message.uid)
        distributable_count = len(distributable_uid_set)
        if distributable_count:
428
          self.assignMessageList(db, 0, distributable_uid_set)
429 430 431
          validated_count += distributable_count
          if validated_count >= MAX_VALIDATED_LIMIT:
            return
432
      where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid)
433

434
  def getReservedMessageList(self, db, date, processing_node, limit,
435
                             group_method_id=None, node_set=None):
436 437 438 439
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
440 441
        This number is not garanted to be reached, because of not enough
        messages being pending execution.
442
    """
443
    assert limit
444 445
    quote = db.string_literal
    query = db.query
446 447 448 449 450 451 452 453 454
    args = (self.sql_table, sqltest_dict['to_date'](date, quote),
            ' AND group_method_id=' + quote(group_method_id)
            if group_method_id else '' , limit)

    # Get reservable messages.
    # During normal operation, sorting by date (as last criteria) is fairer
    # for users and reduce the probability to do the same work several times
    # (think of an object that is modified several times in a short period of
    # time).
455
    if node_set is None:
456 457 458
      result = Results(query(
        "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
        " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
459 460 461 462 463 464 465
    else:
      # We'd like to write
      #   ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
      # but this makes indices inefficient.
      subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
        " WHERE {} AND processing_node=0 AND %s%s"
        " ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
466
      node = 'node=%s' % processing_node
467
      result = Results(query(
468 469 470
        # "ALL" on all but one, to incur deduplication cost only once.
        # "UNION ALL" between the two naturally distinct sets.
        "SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
471 472 473
        " ORDER BY effective_priority, date LIMIT %s"% (
            subquery(-1, node),
            subquery('', 'node=0'),
474 475
            subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
            ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
476
            limit), 0))
477 478 479 480 481 482 483 484 485
    if result:
      # Reserve messages.
      uid_list = [x.uid for x in result]
      self.assignMessageList(db, processing_node, uid_list)
      self._log(TRACE, 'Reserved messages: %r' % uid_list)
      return result
    return ()

  def assignMessageList(self, db, state, uid_list):
486
    """
487
      Put messages back in given processing_node.
488
    """
489 490
    db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
      self.sql_table, state, ','.join(map(str, uid_list))))
491

492
  def getProcessableMessageLoader(self, db, processing_node):
493 494 495
    # do not merge anything
    def load(line):
      uid = line.uid
496
      m = Message.load(line.message, uid=uid, line=line)
497 498 499
      return m, uid, ()
    return load

500 501
  def getProcessableMessageList(self, activity_tool, processing_node,
                                node_family_id_list):
502 503 504 505 506 507 508
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - if this message has a group_method_id:
509 510
        - reserve a bunch of messages
        - until the total "cost" of the group goes over 1
511 512
          - get one message from the reserved bunch (this messages will be
            "needed")
513
          - update the total cost
514 515 516 517 518 519 520 521 522 523 524 525 526 527
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
        4-tuple:
          - list of messages
          - group_method_id
          - uid_to_duplicate_uid_list_dict
    """
528 529
    db = activity_tool.getSQLConnection()
    now_date = getNow(db)
530
    uid_to_duplicate_uid_list_dict = {}
531
    try:
532 533 534 535 536 537 538 539 540 541 542 543 544 545
      while 1: # not a loop
        # Select messages that were either assigned manually or left
        # unprocessed after a shutdown. Most of the time, there's none.
        # To minimize the probability of deadlocks, we also COMMIT so that a
        # new transaction starts on the first 'FOR UPDATE' query, which is all
        # the more important as the current on started with getPriority().
        result = db.query("SELECT * FROM %s WHERE processing_node=%s"
          " ORDER BY priority, date LIMIT 1\0COMMIT" % (
          self.sql_table, processing_node), 0)
        already_assigned = result[1]
        if already_assigned:
          result = Results(result)
        else:
          result = self.getReservedMessageList(db, now_date, processing_node,
546
                                               1, node_set=node_family_id_list)
547 548
          if not result:
            break
549
        load = self.getProcessableMessageLoader(db, processing_node)
550
        m, uid, uid_list = load(result[0])
551
        message_list = [m]
552 553
        uid_to_duplicate_uid_list_dict[uid] = uid_list
        group_method_id = m.line.group_method_id
554
        if group_method_id != '\0':
555
          # Count the number of objects to prevent too many objects.
556 557
          cost = m.activity_kw.get('group_method_cost', .01)
          assert 0 < cost <= 1, (self.sql_table, uid)
558
          count = m.getObjectCount(activity_tool)
559 560 561 562 563
          # this is heuristic (messages with same group_method_id
          # are likely to have the same group_method_cost)
          limit = int(1. / cost + 1 - count)
          if limit > 1: # <=> cost * count < 1
            cost *= count
564
            # Retrieve objects which have the same group method.
565 566 567 568 569 570 571 572 573 574
            result = iter(already_assigned
              and Results(db.query("SELECT * FROM %s"
                " WHERE processing_node=%s AND group_method_id=%s"
                " ORDER BY priority, date LIMIT %s" % (
                self.sql_table, processing_node,
                db.string_literal(group_method_id), limit), 0))
                # Do not optimize rare case: keep the code simple by not
                # adding more results from getReservedMessageList if the
                # limit is not reached.
              or self.getReservedMessageList(db, now_date, processing_node,
575
                limit, group_method_id, node_family_id_list))
576
            for line in result:
577 578 579 580 581
              if line.uid in uid_to_duplicate_uid_list_dict:
                continue
              m, uid, uid_list = load(line)
              if m is None:
                uid_to_duplicate_uid_list_dict[uid] += uid_list
582
                continue
583
              uid_to_duplicate_uid_list_dict[uid] = uid_list
584
              cost += m.getObjectCount(activity_tool) * \
585 586 587 588
                      m.activity_kw.get('group_method_cost', .01)
              message_list.append(m)
              if cost >= 1:
                # Unreserve extra messages as soon as possible.
589 590
                uid_list = [line.uid for line in result if line.uid != uid]
                if uid_list:
591
                  self.assignMessageList(db, 0, uid_list)
592
        return message_list, group_method_id, uid_to_duplicate_uid_list_dict
593
    except:
594
      self._log(WARNING, 'Exception while reserving messages.')
595 596 597 598
      if uid_to_duplicate_uid_list_dict:
        to_free_uid_list = uid_to_duplicate_uid_list_dict.keys()
        for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
          to_free_uid_list += uid_list
599
        try:
600
          self.assignMessageList(db, 0, to_free_uid_list)
601
        except:
602
          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
603
        else:
604
          if to_free_uid_list:
605
            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
606
      else:
607
        self._log(TRACE, '(no message was reserved)')
608
    return (), None, None
609

610 611 612 613 614 615 616 617 618
  def _abort(self):
    try:
      transaction.abort()
    except:
      # Unfortunately, database adapters may raise an exception against abort.
      self._log(PANIC,
          'abort failed, thus some objects may be modified accidentally')
      raise

619
  # Queue semantic
620 621
  def dequeueMessage(self, activity_tool, processing_node,
                     node_family_id_list):
622
    message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
623 624
      self.getProcessableMessageList(activity_tool, processing_node,
        node_family_id_list)
625 626
    if message_list:
      # Remove group_id parameter from group_method_id
627 628 629
      group_method_id = group_method_id.split('\0')[0]
      if group_method_id != "":
        method = activity_tool.invokeGroup
630
        args = (group_method_id, message_list, self.__class__.__name__,
631
                hasattr(self, 'generateMessageUID'))
632 633 634 635 636 637 638 639 640 641 642 643 644 645
        activity_runtime_environment = ActivityRuntimeEnvironment(None)
      else:
        method = activity_tool.invoke
        message = message_list[0]
        args = (message, )
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
      # Commit right before executing messages.
      # As MySQL transaction does not start exactly at the same time as ZODB
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
      # So all connectors must be committed now that we have selected
      # everything needed from MySQL to get a fresh view of ZODB objects.
      transaction.commit()
646
      transaction.begin()
647 648
      # Try to invoke
      try:
649 650
        with activity_runtime_environment:
          method(*args)
651 652 653 654 655 656 657
        # Abort if at least 1 message failed. On next tic, only those that
        # succeeded will be selected because their at_date won't have been
        # increased.
        for m in message_list:
          if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
            raise _DequeueMessageException
        transaction.commit()
658
      except:
659 660 661 662 663 664 665 666 667 668
        exc_info = sys.exc_info()
        if exc_info[1] is not _DequeueMessageException:
          self._log(WARNING,
            'Exception raised when invoking messages (uid, path, method_id) %r'
            % [(m.uid, m.object_path, m.method_id) for m in message_list])
          for m in message_list:
            m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
        self._abort()
        exc_info = message_list[0].exc_info
        if exc_info:
669
          try:
670
            # Register it again.
671 672
            with activity_runtime_environment:
              cancel = message.on_error_callback(*exc_info)
673 674 675 676
            del exc_info, message.exc_info
            transaction.commit()
            if cancel:
              message.setExecutionState(MESSAGE_EXECUTED)
677
          except:
678 679 680
            self._log(WARNING, 'Exception raised when processing error callbacks')
            message.setExecutionState(MESSAGE_NOT_EXECUTED)
            self._abort()
681 682
      self.finalizeMessageExecution(activity_tool, message_list,
                                    uid_to_duplicate_uid_list_dict)
683 684 685
    transaction.commit()
    return not message_list

686 687 688 689 690 691 692 693 694 695 696 697
  def deleteMessageList(self, db, uid_list):
    db.query("DELETE FROM %s WHERE uid IN (%s)" % (
      self.sql_table, ','.join(map(str, uid_list))))

  def reactivateMessageList(self, db, uid_list, delay, retry):
    db.query("UPDATE %s SET"
      " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
      "%s WHERE uid IN (%s)" % (
        self.sql_table, delay,
        ", priority = priority + 1, retry = retry + 1" if retry else "",
        ",".join(map(str, uid_list))))

698 699 700 701 702 703 704
  def finalizeMessageExecution(self, activity_tool, message_list,
                               uid_to_duplicate_uid_list_dict=None):
    """
      If everything was fine, delete all messages.
      If anything failed, make successful messages available (if any), and
      the following rules apply to failed messages:
        - Failures due to ConflictErrors cause messages to be postponed,
705 706
          but their retry count is *not* increased.
        - Failures of messages already above maximum retry count cause them to
707
          be put in a permanent-error state.
708
        - In all other cases, retry count is increased and message is delayed.
709
    """
710
    db = activity_tool.getSQLConnection()
711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
    deletable_uid_list = []
    delay_uid_list = []
    final_error_uid_list = []
    make_available_uid_list = []
    notify_user_list = []
    executed_uid_list = deletable_uid_list
    if uid_to_duplicate_uid_list_dict is not None:
      for m in message_list:
        if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
          executed_uid_list = make_available_uid_list
          break
    for m in message_list:
      uid = m.uid
      if m.getExecutionState() == MESSAGE_EXECUTED:
        executed_uid_list.append(uid)
        if uid_to_duplicate_uid_list_dict is not None:
          executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
        # Should duplicate messages follow strictly the original message, or
        # should they be just made available again ?
        if uid_to_duplicate_uid_list_dict is not None:
          make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
733
        if (m.exc_type and # m.exc_type may be None
734 735
            (m.conflict_retry if issubclass(m.exc_type, ConflictError) else
             m.exc_type is SkippedMessage)):
736 737
          delay_uid_list.append(uid)
        else:
738
          max_retry = m.max_retry
739
          retry = m.line.retry
740 741
          if max_retry is not None and retry >= max_retry:
            # Always notify when we stop retrying.
742
            notify_user_list.append((m, False))
743 744
            final_error_uid_list.append(uid)
            continue
745 746
          # In case of infinite retry, notify the user
          # when the default limit is reached.
747
          if max_retry is None and retry == DEFAULT_MAX_RETRY:
748
            notify_user_list.append((m, True))
749 750 751
          delay = m.delay
          if delay is None:
            # By default, make delay quadratic to the number of retries.
752
            delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2
753 754
          try:
            # Immediately update, because values different for every message
755
            self.reactivateMessageList(db, (uid,), delay, True)
756
          except:
757 758
            self._log(WARNING, 'Failed to reactivate %r' % uid)
        make_available_uid_list.append(uid)
759 760 761 762 763 764 765 766
      else: # MESSAGE_NOT_EXECUTABLE
        # 'path' does not point to any object. Activities are normally flushed
        # (without invoking them) when an object is deleted, but this is only
        # an optimisation. There is no efficient and reliable way to do such
        # this, because a concurrent and very long transaction may be about to
        # activate this object, without conflict.
        # So we have to clean up any remaining activity.
        deletable_uid_list.append(uid)
767 768
    if deletable_uid_list:
      try:
769
        self._retryOnLockError(self.deleteMessageList, (db, deletable_uid_list))
770 771 772 773 774 775
      except:
        self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
      else:
        self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
    if delay_uid_list:
      try:
776
        # If this is a conflict error, do not increase 'retry' but only delay.
777 778
        self.reactivateMessageList(db, delay_uid_list,
                                   VALIDATION_ERROR_DELAY, False)
779 780 781 782
      except:
        self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
    if final_error_uid_list:
      try:
783
        self.assignMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
784 785 786 787 788
      except:
        self._log(ERROR, 'Failed to set message to error state for %r'
                         % final_error_uid_list)
    if make_available_uid_list:
      try:
789
        self.assignMessageList(db, 0, make_available_uid_list)
790 791 792 793 794
      except:
        self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
      else:
        self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
    try:
795 796
      for m, retry in notify_user_list:
        m.notifyUser(activity_tool, retry)
797 798 799 800
    except:
      # Notification failures must not cause this method to raise.
      self._log(WARNING,
        'Exception during notification phase of finalizeMessageExecution')
801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817

  def flush(self, activity_tool, object_path, invoke=0, method_id=None, **kw):
    """
      object_path is a tuple
    """
    path = '/'.join(object_path)
    if invoke:
      invoked = set()
      def invoke(message):
        try:
          key = self.generateMessageUID(message)
          if key in invoked:
            return
          invoked.add(key)
        except AttributeError:
          pass
        line = getattr(message, 'line', None)
818 819
        if (line and line.processing_node != -1 or
            not activity_tool.getDependentMessageList(message)):
820
          # Try to invoke the message - what happens if invoke calls flushActivity ??
821 822
          with ActivityRuntimeEnvironment(message):
            activity_tool.invoke(message)
823 824 825 826 827 828 829 830 831 832 833 834 835
          if message.getExecutionState() != MESSAGE_EXECUTED:
            raise ActivityFlushError('Could not invoke %s on %s'
                                     % (message.method_id, path))
        else:
          raise ActivityFlushError('Could not validate %s on %s'
                                   % (message.method_id, path))
    for m in activity_tool.getRegisteredMessageList(self):
      if object_path == m.object_path and (
         method_id is None or method_id == m.method_id):
        if invoke:
          invoke(m)
        activity_tool.unregisterMessage(self, m)
    uid_list = []
836
    db = activity_tool.getSQLConnection()
837
    for line in self._getMessageList(db, path=path,
838 839
        **({'method_id': method_id} if method_id else {})):
      uid_list.append(line.uid)
840
      if invoke and line.processing_node <= 0:
841
        invoke(Message.load(line.message, uid=line.uid, line=line))
842
    if uid_list:
843
      self.deleteMessageList(db, uid_list)
844 845 846 847 848 849 850

  # Required for tests
  def timeShift(self, activity_tool, delay, processing_node=None):
    """
      To simulate time shift, we simply substract delay from
      all dates in message(_queue) table
    """
851
    activity_tool.getSQLConnection().query("UPDATE %s SET"
852 853
      " date = DATE_SUB(date, INTERVAL %s SECOND)"
      % (self.sql_table, delay)
854 855
      + ('' if processing_node is None else
         "WHERE processing_node=%s" % processing_node))