SQLBase.py 35.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2007 Nexedi SA and Contributors. All Rights Reserved.
#                    Vincent Pelletier <vincent@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import sys
30
import transaction
31 32 33
from random import getrandbits
import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
34 35
from DateTime import DateTime
from Shared.DC.ZRDB.Results import Results
Julien Muchembled's avatar
Julien Muchembled committed
36
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
37
from ZODB.POSException import ConflictError
38
from Products.CMFActivity.ActivityTool import (
39
  Message, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, SkippedMessage)
40
from Products.CMFActivity.ActivityRuntimeEnvironment import (
41
  DEFAULT_MAX_RETRY, ActivityRuntimeEnvironment)
42
from Queue import Queue, VALIDATION_ERROR_DELAY
43
from Products.CMFActivity.Errors import ActivityFlushError
44 45
from Products.ERP5Type import Timeout
from Products.ERP5Type.Timeout import TimeoutReachedError, Deadline
46

47 48 49 50
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000
51
INVOKE_ERROR_STATE = -2
52 53 54 55 56 57 58 59 60 61 62 63 64
# Activity uids are stored as 64 bits unsigned integers.
# No need to depend on a database that supports unsigned integers.
# Numbers are far big enough without using the MSb. Assuming a busy activity
# table having one million activities, the probability of triggering a conflict
# when inserting one activity with 64 bits uid is 0.5e-13. With 63 bits it
# increases to 1e-13, which is still very low.
UID_SAFE_BITSIZE = 63
# Inserting an activity batch of 100 activities among one million existing
# activities has a probability of failing of 1e-11. While it should be low
# enough, retries can help lower that. Try 10 times, which should be short
# enough while yielding one order of magnitude collision probability
# improvement.
UID_ALLOCATION_TRY_COUNT = 10
65

66
def sort_message_key(message):
67
  # same sort key as in SQLBase.getMessageList
68 69
  return message.line.priority, message.line.date, message.uid

70
_DequeueMessageException = Exception()
71

72 73 74
def render_datetime(x):
  return "%.4d-%.2d-%.2d %.2d:%.2d:%09.6f" % x.toZone('UTC').parts()[:6]

75 76 77 78 79 80 81 82 83 84 85 86 87
# sqltest_dict ({'condition_name': <render_function>}) defines how to render
# condition statements in the SQL query used by SQLBase.getMessageList
def sqltest_dict():
  sqltest_dict = {}
  no_quote_type = int, float, long
  def _(name, column=None, op="="):
    if column is None:
      column = name
    column_op = "%s %s " % (column, op)
    def render(value, render_string):
      if isinstance(value, no_quote_type):
        return column_op + str(value)
      if isinstance(value, DateTime):
88
        value = render_datetime(value)
89 90 91 92 93 94
      if isinstance(value, basestring):
        return column_op + render_string(value)
      assert op == "=", value
      if value is None: # XXX: see comment in SQLBase._getMessageList
        return column + " IS NULL"
      for x in value:
95 96 97 98
        return "%s IN (%s)" % (column, ', '.join(map(
          str if isinstance(x, no_quote_type) else
          render_datetime if isinstance(x, DateTime) else
          render_string, value)))
99
      return "0"
100 101 102 103 104 105 106 107
    sqltest_dict[name] = render
  _('active_process_uid')
  _('group_method_id')
  _('method_id')
  _('path')
  _('processing_node')
  _('serialization_tag')
  _('tag')
108
  _('retry')
109 110
  _('to_date', column="date", op="<=")
  _('uid')
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
  def renderAbovePriorityDateUid(value, render_string):
    # Strictly dependent on _getMessageList's sort order: given a well-ordered
    # list of values, rendered condition will match the immediate next row in
    # that sort order.
    priority, date, uid = value
    assert isinstance(priority, no_quote_type)
    assert isinstance(uid, no_quote_type)
    return (
        '(priority>%(priority)s OR (priority=%(priority)s AND '
          '(date>%(date)s OR (date=%(date)s AND uid>%(uid)s))'
        '))' % {
        'priority': priority,
        # render_datetime raises if "date" lacks date API, so no need to check
        'date': render_string(render_datetime(date)),
        'uid': uid,
      }
    )
  sqltest_dict['above_priority_date_uid'] = renderAbovePriorityDateUid
129 130 131
  return sqltest_dict
sqltest_dict = sqltest_dict()

132 133 134 135 136 137 138 139
def getNow(db):
  """
    Return the UTC date from the point of view of the SQL server.
    Note that this value is not cached, and is not transactionnal on MySQL
    side.
  """
  return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]

140
class SQLBase(Queue):
141 142 143
  """
    Define a set of common methods for SQL-based storage of activities.
  """
144 145 146 147 148 149 150 151 152 153
  def createTableSQL(self):
    return """\
CREATE TABLE %s (
  `uid` BIGINT UNSIGNED NOT NULL,
  `date` DATETIME(6) NOT NULL,
  `path` VARCHAR(255) NOT NULL,
  `active_process_uid` INT UNSIGNED NULL,
  `method_id` VARCHAR(255) NOT NULL,
  `processing_node` SMALLINT NOT NULL DEFAULT -1,
  `priority` TINYINT NOT NULL DEFAULT 0,
154
  `node` SMALLINT NOT NULL DEFAULT 0,
155 156 157 158 159 160 161
  `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
  `tag` VARCHAR(255) NOT NULL,
  `serialization_tag` VARCHAR(255) NOT NULL,
  `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
  `message` LONGBLOB NOT NULL,
  PRIMARY KEY (`uid`),
  KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
162
  KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`),
163
  KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
164
  KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
165 166 167 168 169 170
  KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
  KEY (`path`),
  KEY (`active_process_uid`),
  KEY (`method_id`),
  KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
171

172
  def initialize(self, activity_tool, clear):
173 174
    db = activity_tool.getSQLConnection()
    create = self.createTableSQL()
175
    if clear:
176 177
      db.query("DROP TABLE IF EXISTS " + self.sql_table)
      db.query(create)
178
    else:
179 180
      src = db.upgradeSchema(create, create_if_not_exists=1,
                                     initialize=self._initialize)
181 182 183
      if src:
        LOG('CMFActivity', INFO, "%r table upgraded\n%s"
            % (self.sql_table, src))
184 185 186
    self._insert_max_payload = (db.getMaxAllowedPacket()
      + len(self._insert_separator)
      - len(self._insert_template % (self.sql_table, '')))
187 188 189 190 191

  def _initialize(self, db, column_list):
      LOG('CMFActivity', ERROR, "Non-empty %r table upgraded."
          " The following added columns could not be initialized: %s"
          % (self.sql_table, ", ".join(column_list)))
192

193 194
  _insert_template = ("INSERT INTO %s (uid,"
    " path, active_process_uid, date, method_id, processing_node,"
195
    " priority, node, group_method_id, tag, serialization_tag,"
196 197 198
    " message) VALUES\n(%s)")
  _insert_separator = "),\n("

199
  def prepareQueueMessageList(self, activity_tool, message_list):
200 201 202
    db = activity_tool.getSQLConnection()
    quote = db.string_literal
    def insert(reset_uid):
203
      values = self._insert_separator.join(values_list)
204
      del values_list[:]
205
      for _ in xrange(UID_ALLOCATION_TRY_COUNT):
206 207 208 209
        if reset_uid:
          reset_uid = False
          # Overflow will result into IntegrityError.
          db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
210
        try:
211
          db.query(self._insert_template % (self.sql_table, values))
212 213 214
        except MySQLdb.IntegrityError, (code, _):
          if code != DUP_ENTRY:
            raise
215
          reset_uid = True
216 217 218
        else:
          break
      else:
219 220 221 222
        raise RuntimeError("Maximum retry for prepareQueueMessageList reached")
    i = 0
    reset_uid = True
    values_list = []
223 224
    max_payload = self._insert_max_payload
    sep_len = len(self._insert_separator)
225 226 227 228 229 230
    for m in message_list:
      if m.is_registered:
        active_process_uid = m.active_process_uid
        order_validation_text = m.order_validation_text = \
          self.getOrderValidationText(m)
        date = m.activity_kw.get('at_date')
231
        row = ','.join((
232 233 234 235 236 237 238
          '@uid+%s' % i,
          quote('/'.join(m.object_path)),
          'NULL' if active_process_uid is None else str(active_process_uid),
          "UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
          quote(m.method_id),
          '0' if order_validation_text == 'none' else '-1',
          str(m.activity_kw.get('priority', 1)),
239
          str(m.activity_kw.get('node', 0)),
240 241 242
          quote(m.getGroupId()),
          quote(m.activity_kw.get('tag', '')),
          quote(m.activity_kw.get('serialization_tag', '')),
243
          quote(Message.dump(m))))
244
        i += 1
245 246 247 248 249 250 251 252 253 254
        n = sep_len + len(row)
        max_payload -= n
        if max_payload < 0:
          if values_list:
            insert(reset_uid)
            reset_uid = False
            max_payload = self._insert_max_payload - n
          else:
            raise ValueError("max_allowed_packet too small to insert message")
        values_list.append(row)
255 256
    if values_list:
      insert(reset_uid)
257

258
  def _getMessageList(self, db, count=1000, src__=0, **kw):
259 260 261
    # XXX: Because most columns have NOT NULL constraint, conditions with None
    #      value should be ignored, instead of trying to render them
    #      (with comparisons with NULL).
262
    q = db.string_literal
263 264
    sql = '\n  AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems())
    sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % (
265 266 267 268
      self.sql_table,
      sql and '\nWHERE ' + sql,
      '' if count is None else '\nLIMIT %d' % count,
    )
269
    return sql if src__ else Results(db.query(sql, max_rows=0))
270

271 272
  def getMessageList(self, activity_tool, *args, **kw):
    result = self._getMessageList(activity_tool.getSQLConnection(), *args, **kw)
273 274
    if type(result) is str: # src__ == 1
      return result,
275
    class_name = self.__class__.__name__
276
    return [Message.load(line.message,
277
                             activity=class_name,
278 279
                             uid=line.uid,
                             processing_node=line.processing_node,
280
                             retry=line.retry)
281
      for line in result]
282

283 284 285 286 287 288 289 290 291 292 293 294 295 296
  def countMessageSQL(self, quote, **kw):
    return "SELECT count(*) FROM %s WHERE processing_node > -10 AND %s" % (
      self.sql_table, " AND ".join(
        sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v
        ) or "1")

  def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw):
    where = [sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v]
    if only_valid:
      where.append('processing_node > -2')
    if only_invalid:
      where.append('processing_node < -1')
    return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
      self.sql_table, " AND ".join(where) or "1")
297

298 299
  def getPriority(self, activity_tool, processing_node, node_set=None):
    if node_set is None:
300 301 302 303 304 305 306
      q = ("SELECT 3*priority, date FROM %s"
        " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
        " ORDER BY priority, date LIMIT 1" % self.sql_table)
    else:
      subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
        " WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
        " ORDER BY priority, date LIMIT 1)" % self.sql_table).format
307 308 309 310
      node = 'node=%s' % processing_node
      # "ALL" on all but one, to incur deduplication cost only once.
      # "UNION ALL" between the two naturally distinct sets.
      q = ("SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
311 312 313
        " ORDER BY effective_priority, date LIMIT 1" % (
          subquery(-1, node),
          subquery('', 'node=0'),
314 315
          subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
          ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
316 317 318 319
        ))
    result = activity_tool.getSQLConnection().query(q, 0)[1]
    if result:
      return result[0]
320
    return Queue.getPriority(self, activity_tool, processing_node, node_set)
321

322
  def _retryOnLockError(self, method, args=(), kw={}):
323 324
    while True:
      try:
325
        return method(*args, **kw)
326 327 328 329
      except ConflictError:
        # Note that this code assumes that a database adapter translates
        # a lock error into a conflict error.
        LOG('SQLBase', INFO, 'Got a lock error, retrying...')
330

331
  # Validation private methods
332
  def getValidationSQL(self, quote, activate_kw, same_queue):
333 334 335 336 337 338
    validate_list = []
    for k, v in activate_kw.iteritems():
      if v is not None:
        try:
          method = getattr(self, '_validate_' + k, None)
          if method:
339
            validate_list.append(' AND '.join(method(v, quote)))
340 341 342 343 344 345 346 347
        except Exception:
          LOG('CMFActivity', WARNING, 'invalid %s value: %r' % (k, v),
              error=True)
          # Prevent validation by depending on anything, at least itself.
          validate_list = '1',
          same_queue = False
          break
    if validate_list:
348 349 350 351 352 353
      return ("SELECT '%s' as activity, uid, date, processing_node,"
              " priority, group_method_id, message FROM %s"
              " WHERE processing_node > -10 AND (%s) LIMIT %s" % (
                type(self).__name__, self.sql_table,
                ' OR '.join(validate_list),
                READ_MESSAGE_LIMIT if same_queue else 1))
354

355 356
  def _validate_after_method_id(self, *args):
    return sqltest_dict['method_id'](*args),
357

358 359
  def _validate_after_path(self, *args):
    return sqltest_dict['path'](*args),
360

361 362
  def _validate_after_message_uid(self, *args):
    return sqltest_dict['uid'](*args),
363

364 365 366 367
  def _validate_after_path_and_method_id(self, value, quote):
    path, method_id = value
    return (sqltest_dict['method_id'](method_id, quote),
            sqltest_dict['path'](path, quote))
368

369 370
  def _validate_after_tag(self, *args):
    return sqltest_dict['tag'](*args),
371

372 373 374 375
  def _validate_after_tag_and_method_id(self, value, quote):
    tag, method_id = value
    return (sqltest_dict['method_id'](method_id, quote),
            sqltest_dict['tag'](tag, quote))
376

377 378
  def _validate_serialization_tag(self, *args):
    return 'processing_node > -1', sqltest_dict['serialization_tag'](*args)
379 380 381

  def _log(self, severity, summary):
    LOG(self.__class__.__name__, severity, summary,
382
        error=severity > INFO)
383

384
  def distribute(self, activity_tool, node_count):
385 386
    db = activity_tool.getSQLConnection()
    now_date = getNow(db)
387 388 389 390 391
    where_kw = {
      'processing_node': -1,
      'to_date': now_date,
      'count': READ_MESSAGE_LIMIT,
    }
392 393
    validated_count = 0
    while 1:
394
      result = self._getMessageList(db, **where_kw)
395 396 397 398 399 400 401 402 403 404 405 406
      if not result:
        return
      transaction.commit()

      validation_text_dict = {'none': 1}
      message_dict = {}
      for line in result:
        message = Message.load(line.message, uid=line.uid, line=line)
        if not hasattr(message, 'order_validation_text'): # BBB
          message.order_validation_text = self.getOrderValidationText(message)
        self.getExecutableMessageList(activity_tool, message, message_dict,
                                      validation_text_dict, now_date=now_date)
407
      transaction.commit()
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
      if message_dict:
        distributable_uid_set = set()
        serialization_tag_dict = {}
        for message in message_dict.itervalues():
          serialization_tag = message.activity_kw.get('serialization_tag')
          if serialization_tag is None:
            distributable_uid_set.add(message.uid)
          else:
            serialization_tag_dict.setdefault(serialization_tag,
                                              []).append(message)
        for message_list in serialization_tag_dict.itervalues():
          # Sort list of messages to validate the message with highest score
          message_list.sort(key=sort_message_key)
          distributable_uid_set.add(message_list[0].uid)
          group_method_id = message_list[0].line.group_method_id
          if group_method_id == '\0':
            continue
          for message in message_list[1:]:
            if group_method_id == message.line.group_method_id:
              distributable_uid_set.add(message.uid)
        distributable_count = len(distributable_uid_set)
        if distributable_count:
430
          self.assignMessageList(db, 0, distributable_uid_set)
431 432 433
          validated_count += distributable_count
          if validated_count >= MAX_VALIDATED_LIMIT:
            return
434
      where_kw['above_priority_date_uid'] = (line.priority, line.date, line.uid)
435

436
  def getReservedMessageList(self, db, date, processing_node, limit,
437
                             group_method_id=None, node_set=None):
438 439 440 441
    """
      Get and reserve a list of messages.
      limit
        Maximum number of messages to fetch.
442 443
        This number is not garanted to be reached, because of not enough
        messages being pending execution.
444
    """
445
    assert limit
446 447
    quote = db.string_literal
    query = db.query
448 449 450 451 452 453 454 455 456
    args = (self.sql_table, sqltest_dict['to_date'](date, quote),
            ' AND group_method_id=' + quote(group_method_id)
            if group_method_id else '' , limit)

    # Get reservable messages.
    # During normal operation, sorting by date (as last criteria) is fairer
    # for users and reduce the probability to do the same work several times
    # (think of an object that is modified several times in a short period of
    # time).
457
    if node_set is None:
458 459 460
      result = Results(query(
        "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
        " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
461 462 463 464 465 466 467
    else:
      # We'd like to write
      #   ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
      # but this makes indices inefficient.
      subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
        " WHERE {} AND processing_node=0 AND %s%s"
        " ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
468
      node = 'node=%s' % processing_node
469
      result = Results(query(
470 471 472
        # "ALL" on all but one, to incur deduplication cost only once.
        # "UNION ALL" between the two naturally distinct sets.
        "SELECT * FROM (%s UNION ALL %s UNION %s%s) as t"
473 474 475
        " ORDER BY effective_priority, date LIMIT %s"% (
            subquery(-1, node),
            subquery('', 'node=0'),
476 477
            subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 'node>=0'),
            ' UNION ALL ' + subquery(-1, 'node IN (%s)' % ','.join(map(str, node_set))) if node_set else '',
478
            limit), 0))
479 480 481 482 483 484 485 486 487
    if result:
      # Reserve messages.
      uid_list = [x.uid for x in result]
      self.assignMessageList(db, processing_node, uid_list)
      self._log(TRACE, 'Reserved messages: %r' % uid_list)
      return result
    return ()

  def assignMessageList(self, db, state, uid_list):
488
    """
489
      Put messages back in given processing_node.
490
    """
491 492
    db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
      self.sql_table, state, ','.join(map(str, uid_list))))
493

494
  def getProcessableMessageLoader(self, db, processing_node):
495 496 497
    # do not merge anything
    def load(line):
      uid = line.uid
498
      m = Message.load(line.message, uid=uid, line=line)
499 500 501
      return m, uid, ()
    return load

502 503
  def getProcessableMessageList(self, activity_tool, processing_node,
                                node_family_id_list):
504 505 506 507 508 509 510
    """
      Always true:
        For each reserved message, delete redundant messages when it gets
        reserved (definitely lost, but they are expandable since redundant).

      - reserve a message
      - if this message has a group_method_id:
511 512
        - reserve a bunch of messages
        - until the total "cost" of the group goes over 1
513 514
          - get one message from the reserved bunch (this messages will be
            "needed")
515
          - update the total cost
516 517 518 519 520 521 522 523 524 525 526 527 528 529
        - unreserve "unneeded" messages
      - return still-reserved message list and a group_method_id

      If any error happens in above described process, try to unreserve all
      messages already reserved in that process.
      If it fails, complain loudly that some messages might still be in an
      unclean state.

      Returned values:
        4-tuple:
          - list of messages
          - group_method_id
          - uid_to_duplicate_uid_list_dict
    """
530 531
    db = activity_tool.getSQLConnection()
    now_date = getNow(db)
532
    uid_to_duplicate_uid_list_dict = {}
533
    try:
534 535 536 537 538 539 540 541 542 543 544 545 546 547
      while 1: # not a loop
        # Select messages that were either assigned manually or left
        # unprocessed after a shutdown. Most of the time, there's none.
        # To minimize the probability of deadlocks, we also COMMIT so that a
        # new transaction starts on the first 'FOR UPDATE' query, which is all
        # the more important as the current on started with getPriority().
        result = db.query("SELECT * FROM %s WHERE processing_node=%s"
          " ORDER BY priority, date LIMIT 1\0COMMIT" % (
          self.sql_table, processing_node), 0)
        already_assigned = result[1]
        if already_assigned:
          result = Results(result)
        else:
          result = self.getReservedMessageList(db, now_date, processing_node,
548
                                               1, node_set=node_family_id_list)
549 550
          if not result:
            break
551
        load = self.getProcessableMessageLoader(db, processing_node)
552
        m, uid, uid_list = load(result[0])
553
        message_list = [m]
554 555
        uid_to_duplicate_uid_list_dict[uid] = uid_list
        group_method_id = m.line.group_method_id
556
        if group_method_id[0] != '\0':
557
          # Count the number of objects to prevent too many objects.
558 559
          cost = m.activity_kw.get('group_method_cost', .01)
          assert 0 < cost <= 1, (self.sql_table, uid)
560
          count = m.getObjectCount(activity_tool)
561 562 563 564 565
          # this is heuristic (messages with same group_method_id
          # are likely to have the same group_method_cost)
          limit = int(1. / cost + 1 - count)
          if limit > 1: # <=> cost * count < 1
            cost *= count
566
            # Retrieve objects which have the same group method.
567 568 569 570 571 572 573 574 575 576
            result = iter(already_assigned
              and Results(db.query("SELECT * FROM %s"
                " WHERE processing_node=%s AND group_method_id=%s"
                " ORDER BY priority, date LIMIT %s" % (
                self.sql_table, processing_node,
                db.string_literal(group_method_id), limit), 0))
                # Do not optimize rare case: keep the code simple by not
                # adding more results from getReservedMessageList if the
                # limit is not reached.
              or self.getReservedMessageList(db, now_date, processing_node,
577
                limit, group_method_id, node_family_id_list))
578
            for line in result:
579 580 581 582 583
              if line.uid in uid_to_duplicate_uid_list_dict:
                continue
              m, uid, uid_list = load(line)
              if m is None:
                uid_to_duplicate_uid_list_dict[uid] += uid_list
584
                continue
585
              uid_to_duplicate_uid_list_dict[uid] = uid_list
586
              cost += m.getObjectCount(activity_tool) * \
587 588 589 590
                      m.activity_kw.get('group_method_cost', .01)
              message_list.append(m)
              if cost >= 1:
                # Unreserve extra messages as soon as possible.
591 592
                uid_list = [line.uid for line in result if line.uid != uid]
                if uid_list:
593
                  self.assignMessageList(db, 0, uid_list)
594
        return message_list, group_method_id, uid_to_duplicate_uid_list_dict
595
    except:
596
      self._log(WARNING, 'Exception while reserving messages.')
597 598 599 600
      if uid_to_duplicate_uid_list_dict:
        to_free_uid_list = uid_to_duplicate_uid_list_dict.keys()
        for uid_list in uid_to_duplicate_uid_list_dict.itervalues():
          to_free_uid_list += uid_list
601
        try:
602
          self.assignMessageList(db, 0, to_free_uid_list)
603
        except:
604
          self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list)
605
        else:
606
          if to_free_uid_list:
607
            self._log(TRACE, 'Freed messages %r' % to_free_uid_list)
608
      else:
609
        self._log(TRACE, '(no message was reserved)')
610
    return (), None, None
611

612 613 614 615 616 617 618 619 620
  def _abort(self):
    try:
      transaction.abort()
    except:
      # Unfortunately, database adapters may raise an exception against abort.
      self._log(PANIC,
          'abort failed, thus some objects may be modified accidentally')
      raise

621
  # Queue semantic
622 623
  def dequeueMessage(self, activity_tool, processing_node,
                     node_family_id_list):
624
    message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
625 626
      self.getProcessableMessageList(activity_tool, processing_node,
        node_family_id_list)
627 628
    if message_list:
      # Remove group_id parameter from group_method_id
629 630 631
      group_method_id = group_method_id.split('\0')[0]
      if group_method_id != "":
        method = activity_tool.invokeGroup
632
        args = (group_method_id, message_list, self.__class__.__name__,
633
                hasattr(self, 'generateMessageUID'))
634 635 636 637
        activity_runtime_environment = ActivityRuntimeEnvironment(
          None,
          priority=min(x.line.priority for x in message_list),
        )
638 639
      else:
        method = activity_tool.invoke
640 641
        message, = message_list
        args = message_list
642 643 644 645 646 647 648 649 650
        activity_runtime_environment = ActivityRuntimeEnvironment(message)
      # Commit right before executing messages.
      # As MySQL transaction does not start exactly at the same time as ZODB
      # transactions but a bit later, messages available might be called
      # on objects which are not available - or available in an old
      # version - to ZODB connector.
      # So all connectors must be committed now that we have selected
      # everything needed from MySQL to get a fresh view of ZODB objects.
      transaction.commit()
651
      transaction.begin()
652 653
      # Try to invoke
      try:
654 655 656 657 658
        # Refer Timeout.activity_timeout instead of
        #   from Products.ERP5Type.Timeout import activity_timeout
        # so that we can override the value in Timeout namescope in unit tests.
        offset = Timeout.activity_timeout
        with activity_runtime_environment, Deadline(offset):
659
          method(*args)
660 661 662 663 664 665 666
        # Abort if at least 1 message failed. On next tic, only those that
        # succeeded will be selected because their at_date won't have been
        # increased.
        for m in message_list:
          if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
            raise _DequeueMessageException
        transaction.commit()
667
      except:
668 669 670 671 672 673 674 675 676 677
        exc_info = sys.exc_info()
        if exc_info[1] is not _DequeueMessageException:
          self._log(WARNING,
            'Exception raised when invoking messages (uid, path, method_id) %r'
            % [(m.uid, m.object_path, m.method_id) for m in message_list])
          for m in message_list:
            m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
        self._abort()
        exc_info = message_list[0].exc_info
        if exc_info:
678
          try:
679
            # Register it again.
680 681
            with activity_runtime_environment:
              cancel = message.on_error_callback(*exc_info)
682 683 684 685
            del exc_info, message.exc_info
            transaction.commit()
            if cancel:
              message.setExecutionState(MESSAGE_EXECUTED)
686
          except:
687 688 689
            self._log(WARNING, 'Exception raised when processing error callbacks')
            message.setExecutionState(MESSAGE_NOT_EXECUTED)
            self._abort()
690 691
      self.finalizeMessageExecution(activity_tool, message_list,
                                    uid_to_duplicate_uid_list_dict)
692 693 694
    transaction.commit()
    return not message_list

695 696 697 698 699 700 701 702 703 704 705 706
  def deleteMessageList(self, db, uid_list):
    db.query("DELETE FROM %s WHERE uid IN (%s)" % (
      self.sql_table, ','.join(map(str, uid_list))))

  def reactivateMessageList(self, db, uid_list, delay, retry):
    db.query("UPDATE %s SET"
      " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
      "%s WHERE uid IN (%s)" % (
        self.sql_table, delay,
        ", priority = priority + 1, retry = retry + 1" if retry else "",
        ",".join(map(str, uid_list))))

707 708 709 710 711 712 713
  def finalizeMessageExecution(self, activity_tool, message_list,
                               uid_to_duplicate_uid_list_dict=None):
    """
      If everything was fine, delete all messages.
      If anything failed, make successful messages available (if any), and
      the following rules apply to failed messages:
        - Failures due to ConflictErrors cause messages to be postponed,
714 715
          but their retry count is *not* increased.
        - Failures of messages already above maximum retry count cause them to
716
          be put in a permanent-error state.
717
        - In all other cases, retry count is increased and message is delayed.
718
    """
719
    db = activity_tool.getSQLConnection()
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
    deletable_uid_list = []
    delay_uid_list = []
    final_error_uid_list = []
    make_available_uid_list = []
    notify_user_list = []
    executed_uid_list = deletable_uid_list
    if uid_to_duplicate_uid_list_dict is not None:
      for m in message_list:
        if m.getExecutionState() == MESSAGE_NOT_EXECUTED:
          executed_uid_list = make_available_uid_list
          break
    for m in message_list:
      uid = m.uid
      if m.getExecutionState() == MESSAGE_EXECUTED:
        executed_uid_list.append(uid)
        if uid_to_duplicate_uid_list_dict is not None:
          executed_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
      elif m.getExecutionState() == MESSAGE_NOT_EXECUTED:
        # Should duplicate messages follow strictly the original message, or
        # should they be just made available again ?
        if uid_to_duplicate_uid_list_dict is not None:
          make_available_uid_list += uid_to_duplicate_uid_list_dict.get(uid, ())
742
        if (m.exc_type and # m.exc_type may be None
743 744
            (m.conflict_retry if issubclass(m.exc_type, ConflictError) else
             m.exc_type is SkippedMessage)):
745 746
          delay_uid_list.append(uid)
        else:
747
          max_retry = m.max_retry
748
          retry = m.line.retry
749 750
          if (max_retry is not None and retry >= max_retry) or \
              m.exc_type == TimeoutReachedError:
751
            # Always notify when we stop retrying.
752
            notify_user_list.append((m, False))
753 754
            final_error_uid_list.append(uid)
            continue
755 756
          # In case of infinite retry, notify the user
          # when the default limit is reached.
757
          if max_retry is None and retry == DEFAULT_MAX_RETRY:
758
            notify_user_list.append((m, True))
759 760 761
          delay = m.delay
          if delay is None:
            # By default, make delay quadratic to the number of retries.
762
            delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2
763 764
          try:
            # Immediately update, because values different for every message
765
            self.reactivateMessageList(db, (uid,), delay, True)
766
          except:
767 768
            self._log(WARNING, 'Failed to reactivate %r' % uid)
        make_available_uid_list.append(uid)
769 770 771 772 773 774 775 776
      else: # MESSAGE_NOT_EXECUTABLE
        # 'path' does not point to any object. Activities are normally flushed
        # (without invoking them) when an object is deleted, but this is only
        # an optimisation. There is no efficient and reliable way to do such
        # this, because a concurrent and very long transaction may be about to
        # activate this object, without conflict.
        # So we have to clean up any remaining activity.
        deletable_uid_list.append(uid)
777 778
    if deletable_uid_list:
      try:
779
        self._retryOnLockError(self.deleteMessageList, (db, deletable_uid_list))
780 781 782 783 784 785
      except:
        self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list)
      else:
        self._log(TRACE, 'Deleted messages %r' % deletable_uid_list)
    if delay_uid_list:
      try:
786
        # If this is a conflict error, do not increase 'retry' but only delay.
787 788
        self.reactivateMessageList(db, delay_uid_list,
                                   VALIDATION_ERROR_DELAY, False)
789 790 791 792
      except:
        self._log(ERROR, 'Failed to delay %r' % delay_uid_list)
    if final_error_uid_list:
      try:
793
        self.assignMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list)
794 795 796 797 798
      except:
        self._log(ERROR, 'Failed to set message to error state for %r'
                         % final_error_uid_list)
    if make_available_uid_list:
      try:
799
        self.assignMessageList(db, 0, make_available_uid_list)
800 801 802 803 804
      except:
        self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list)
      else:
        self._log(TRACE, 'Freed messages %r' % make_available_uid_list)
    try:
805 806
      for m, retry in notify_user_list:
        m.notifyUser(activity_tool, retry)
807 808 809 810
    except:
      # Notification failures must not cause this method to raise.
      self._log(WARNING,
        'Exception during notification phase of finalizeMessageExecution')
811

812
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, only_safe=False, **kw):
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
    """
      object_path is a tuple
    """
    path = '/'.join(object_path)
    if invoke:
      invoked = set()
      def invoke(message):
        try:
          key = self.generateMessageUID(message)
          if key in invoked:
            return
          invoked.add(key)
        except AttributeError:
          pass
        line = getattr(message, 'line', None)
828 829
        if (line and line.processing_node != -1 or
            not activity_tool.getDependentMessageList(message)):
830
          # Try to invoke the message - what happens if invoke calls flushActivity ??
831 832
          with ActivityRuntimeEnvironment(message):
            activity_tool.invoke(message)
833 834 835 836 837 838 839 840 841 842 843 844 845
          if message.getExecutionState() != MESSAGE_EXECUTED:
            raise ActivityFlushError('Could not invoke %s on %s'
                                     % (message.method_id, path))
        else:
          raise ActivityFlushError('Could not validate %s on %s'
                                   % (message.method_id, path))
    for m in activity_tool.getRegisteredMessageList(self):
      if object_path == m.object_path and (
         method_id is None or method_id == m.method_id):
        if invoke:
          invoke(m)
        activity_tool.unregisterMessage(self, m)
    uid_list = []
846
    db = activity_tool.getSQLConnection()
847
    for line in self._getMessageList(db, path=path,
848
        **({'method_id': method_id} if method_id else {})):
849 850
      if only_safe and line.processing_node > -2:
        continue
851
      uid_list.append(line.uid)
852
      if invoke and line.processing_node <= 0:
853
        invoke(Message.load(line.message, uid=line.uid, line=line))
854
    if uid_list:
855
      self.deleteMessageList(db, uid_list)
856 857 858 859 860 861 862

  # Required for tests
  def timeShift(self, activity_tool, delay, processing_node=None):
    """
      To simulate time shift, we simply substract delay from
      all dates in message(_queue) table
    """
863
    activity_tool.getSQLConnection().query("UPDATE %s SET"
864 865
      " date = DATE_SUB(date, INTERVAL %s SECOND)"
      % (self.sql_table, delay)
866 867
      + ('' if processing_node is None else
         "WHERE processing_node=%s" % processing_node))