Commit 301962ad authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: new activate() parameter to prefer executing on the same node

The goal is to make better use of the ZODB Storage cache. It is common to do
processing on a data set in several sequential transactions: in such case, by
continuing execution of these messages on the same node, data is loaded from
ZODB only once. Without this, and if there are many other messages to process,
processing always continue on a random node, causing much more load from ZODB.

To prevent nodes from having too much work to do, or too little compared to
other nodes, this new parameter is only a hint for CMFActivity. It remains
possible for a node to execute a message that was intended for another node.

Before this commit, a processing node selects the first message(s) according to
the following ordering:

  priority, date

and now:

  priority, node_preference, date

where node_preference is:

  -1 -> same node
   0 -> no preferred node
   1 -> another node

The implementation is tricky for 2 reasons:
- MariaDB can't order this way in a single simple query, so we have 1
  subquery for each case, potentially getting 3 times the wanted maximum of
  messages, then order/filter on the resulting union.
- MariaDB also can't filter efficiently messages for other nodes, so the 3rd
  subquery returns messages for any node, potentially duplicating results from
  the first 2 subqueries. This works because they'll be ordered last.
  Unfortunately, this requires extra indices.

In any case, message reservation must be very efficient, or MariaDB deadlocks
quickly happen, and locking an activity table during reservation reduces
parallelism too much.

In addition to better cache efficiency, this new feature can be used as a
workaround for a bug affecting serialiation_tag, causing IntegrityError when
reindexing many new objects. If you have 2 recursive reindexations for both a
document and one of its lines, and if you have so many messages than grouping
is split between these 2 messages, then you end up with 2 nodes indexing the
same line in parallel: for some tables, the pattern DELETE+INSERT conflicts
since InnoDB does not take any lock when deleting a non-existent row.

If you have many activities creating such documents, you can combine with
grouping and appropriate priority to make sure that such pair of messages won't
be executed on different nodes, except maybe at the end (when there's no
document to create anymore; then activity reexecution may be enough).
For example:

  from Products.CMFActivity.ActivityTool import getCurrentNode
  portal.setPlacelessDefaultReindexParameters(
    activate_kw={'node': 'same', 'priority': priority},
    group_id=getCurrentNode())

where `priority` is the same as the activity containing the above code, which
can also use grouping without increasing the probability of IntegrityError.
parent 3ca5bf97
...@@ -60,9 +60,29 @@ class ActiveObject(ExtensionClass.Base): ...@@ -60,9 +60,29 @@ class ActiveObject(ExtensionClass.Base):
activate_kw=None, REQUEST=None, **kw): activate_kw=None, REQUEST=None, **kw):
"""Returns an active wrapper for this object. """Returns an active wrapper for this object.
Reserved Optional parameters: priority -- any integer between -128 and 127 included
(default: 1)
node -- SQLDict & SQLQueue only;
can be one of the following values:
- "same": prefer execution on this node, to make
better use of the ZODB Storage cache
- "": no node preference
at_date -- request execution date for this activate call at_date -- request execution date for this activate call
(default: date of commit)
Messages are executed according to the following ordering:
priority, node_preference, date
where node_preference is:
-1 -> same node
0 -> no preferred node
1 -> another node
Validation parameters:
after_method_id -- never validate message if after_method_id after_method_id -- never validate message if after_method_id
is in the list of methods which are is in the list of methods which are
......
...@@ -187,11 +187,11 @@ class Queue(object): ...@@ -187,11 +187,11 @@ class Queue(object):
""" """
pass pass
def getPriority(self, activity_tool): def getPriority(self, activity_tool, node):
""" """
Get priority from this queue. Get priority from this queue.
Lower number means higher priority value. Lower number means higher priority value.
Legal value range is [-128, 127]. Legal value range is [-385, 382].
Values out of this range might work, but are non-standard. Values out of this range might work, but are non-standard.
""" """
return 128, return 384,
...@@ -134,6 +134,7 @@ CREATE TABLE %s ( ...@@ -134,6 +134,7 @@ CREATE TABLE %s (
`method_id` VARCHAR(255) NOT NULL, `method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1, `processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0, `priority` TINYINT NOT NULL DEFAULT 0,
`node` SMALLINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '', `group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL, `tag` VARCHAR(255) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL, `serialization_tag` VARCHAR(255) NOT NULL,
...@@ -141,7 +142,9 @@ CREATE TABLE %s ( ...@@ -141,7 +142,9 @@ CREATE TABLE %s (
`message` LONGBLOB NOT NULL, `message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`), PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`), KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node2_priority_date` (`processing_node`, `node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`), KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `node2_group_priority_date` (`processing_node`, `node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`), KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`), KEY (`path`),
KEY (`active_process_uid`), KEY (`active_process_uid`),
...@@ -172,7 +175,7 @@ CREATE TABLE %s ( ...@@ -172,7 +175,7 @@ CREATE TABLE %s (
_insert_template = ("INSERT INTO %s (uid," _insert_template = ("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node," " path, active_process_uid, date, method_id, processing_node,"
" priority, group_method_id, tag, serialization_tag," " priority, node, group_method_id, tag, serialization_tag,"
" message) VALUES\n(%s)") " message) VALUES\n(%s)")
_insert_separator = "),\n(" _insert_separator = "),\n("
...@@ -216,6 +219,7 @@ CREATE TABLE %s ( ...@@ -216,6 +219,7 @@ CREATE TABLE %s (
quote(m.method_id), quote(m.method_id),
'0' if order_validation_text == 'none' else '-1', '0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)), str(m.activity_kw.get('priority', 1)),
str(m.activity_kw.get('node', 0)),
quote(m.getGroupId()), quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')), quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('serialization_tag', '')), quote(m.activity_kw.get('serialization_tag', '')),
...@@ -274,12 +278,26 @@ CREATE TABLE %s ( ...@@ -274,12 +278,26 @@ CREATE TABLE %s (
return "SELECT 1 FROM %s WHERE %s LIMIT 1" % ( return "SELECT 1 FROM %s WHERE %s LIMIT 1" % (
self.sql_table, " AND ".join(where) or "1") self.sql_table, " AND ".join(where) or "1")
def getPriority(self, activity_tool): def getPriority(self, activity_tool, node=None):
result = activity_tool.getSQLConnection().query( if node is None:
"SELECT priority, date FROM %s" q = ("SELECT 3*priority, date FROM %s"
" WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)" " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1" % self.sql_table, 0)[1] " ORDER BY priority, date LIMIT 1" % self.sql_table)
return result[0] if result else Queue.getPriority(self, activity_tool) else:
subquery = ("(SELECT 3*priority{} as effective_priority, date FROM %s"
" WHERE {} AND processing_node=0 AND date <= UTC_TIMESTAMP(6)"
" ORDER BY priority, date LIMIT 1)" % self.sql_table).format
node = 'node=%s' % node
q = ("SELECT * FROM (%s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT 1" % (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
))
result = activity_tool.getSQLConnection().query(q, 0)[1]
if result:
return result[0]
return Queue.getPriority(self, activity_tool, node)
def _retryOnLockError(self, method, args=(), kw={}): def _retryOnLockError(self, method, args=(), kw={}):
while True: while True:
...@@ -398,7 +416,7 @@ CREATE TABLE %s ( ...@@ -398,7 +416,7 @@ CREATE TABLE %s (
where_kw['above_uid'] = line.uid where_kw['above_uid'] = line.uid
def getReservedMessageList(self, db, date, processing_node, limit, def getReservedMessageList(self, db, date, processing_node, limit,
group_method_id=None): group_method_id=None, node=None):
""" """
Get and reserve a list of messages. Get and reserve a list of messages.
limit limit
...@@ -418,10 +436,25 @@ CREATE TABLE %s ( ...@@ -418,10 +436,25 @@ CREATE TABLE %s (
# for users and reduce the probability to do the same work several times # for users and reduce the probability to do the same work several times
# (think of an object that is modified several times in a short period of # (think of an object that is modified several times in a short period of
# time). # time).
if 1: if node is None:
result = Results(query( result = Results(query(
"SELECT * FROM %s WHERE processing_node=0 AND %s%s" "SELECT * FROM %s WHERE processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0)) " ORDER BY priority, date LIMIT %s FOR UPDATE" % args, 0))
else:
# We'd like to write
# ORDER BY priority, IF(node, IF(node={node}, -1, 1), 0), date
# but this makes indices inefficient.
subquery = ("(SELECT *, 3*priority{} as effective_priority FROM %s"
" WHERE {} AND processing_node=0 AND %s%s"
" ORDER BY priority, date LIMIT %s FOR UPDATE)" % args).format
node = 'node=%s' % node
result = Results(query(
"SELECT * FROM (%s UNION ALL %s UNION %s) as t"
" ORDER BY effective_priority, date LIMIT %s"% (
subquery(-1, node),
subquery('', 'node=0'),
subquery('+IF(node, IF(%s, -1, 1), 0)' % node, 1),
limit), 0))
if result: if result:
# Reserve messages. # Reserve messages.
uid_list = [x.uid for x in result] uid_list = [x.uid for x in result]
...@@ -490,7 +523,7 @@ CREATE TABLE %s ( ...@@ -490,7 +523,7 @@ CREATE TABLE %s (
result = Results(result) result = Results(result)
else: else:
result = self.getReservedMessageList(db, now_date, processing_node, result = self.getReservedMessageList(db, now_date, processing_node,
1) 1, node=processing_node)
if not result: if not result:
break break
load = self.getProcessableMessageLoader(db, processing_node) load = self.getProcessableMessageLoader(db, processing_node)
...@@ -519,7 +552,7 @@ CREATE TABLE %s ( ...@@ -519,7 +552,7 @@ CREATE TABLE %s (
# adding more results from getReservedMessageList if the # adding more results from getReservedMessageList if the
# limit is not reached. # limit is not reached.
or self.getReservedMessageList(db, now_date, processing_node, or self.getReservedMessageList(db, now_date, processing_node,
limit, group_method_id)) limit, group_method_id, processing_node))
for line in result: for line in result:
if line.uid in uid_to_duplicate_uid_list_dict: if line.uid in uid_to_duplicate_uid_list_dict:
continue continue
......
...@@ -178,3 +178,11 @@ CREATE TABLE %s ( ...@@ -178,3 +178,11 @@ CREATE TABLE %s (
# earlier. # earlier.
return None, original_uid, [uid] return None, original_uid, [uid]
return load return load
def getPriority(self, activity_tool, node):
return SQLDict.getPriority(self, activity_tool)
def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None, node=None):
return SQLDict.getReservedMessageList(self, db,
date, processing_node, limit, group_method_id)
...@@ -1112,7 +1112,7 @@ class ActivityTool (BaseTool): ...@@ -1112,7 +1112,7 @@ class ActivityTool (BaseTool):
# getPriority does not see messages dequeueMessage would process. # getPriority does not see messages dequeueMessage would process.
activity_list = activity_dict.values() activity_list = activity_dict.values()
def sort_key(activity): def sort_key(activity):
return activity.getPriority(self) return activity.getPriority(self, processing_node)
while is_running_lock.acquire(0): while is_running_lock.acquire(0):
try: try:
activity_list.sort(key=sort_key) # stable sort activity_list.sort(key=sort_key) # stable sort
...@@ -1181,7 +1181,9 @@ class ActivityTool (BaseTool): ...@@ -1181,7 +1181,9 @@ class ActivityTool (BaseTool):
thread_activity_buffer[my_thread_key] = buffer thread_activity_buffer[my_thread_key] = buffer
return buffer return buffer
def activateObject(self, object, activity=DEFAULT_ACTIVITY, active_process=None, **kw): def activateObject(self, object, activity=DEFAULT_ACTIVITY,
active_process=None, serialization_tag=None,
node=None, **kw):
if active_process is None: if active_process is None:
active_process_uid = None active_process_uid = None
elif isinstance(active_process, str): elif isinstance(active_process, str):
...@@ -1201,8 +1203,16 @@ class ActivityTool (BaseTool): ...@@ -1201,8 +1203,16 @@ class ActivityTool (BaseTool):
except AttributeError: except AttributeError:
pass pass
url = object.getPhysicalPath() url = object.getPhysicalPath()
if kw.get('serialization_tag', False) is None: if serialization_tag is not None:
del kw['serialization_tag'] kw['serialization_tag'] = serialization_tag
if node is not None:
if node != 'same':
raise ValueError("Invalid node argument %r" % node)
try:
kw['node'] = 1 + self.getNodeList(
role=ROLE_PROCESSING).index(getCurrentNode())
except ValueError:
pass
return ActiveWrapper(self, url, oid, activity, return ActiveWrapper(self, url, oid, activity,
active_process, active_process_uid, kw, active_process, active_process_uid, kw,
getattr(self, 'REQUEST', None)) getattr(self, 'REQUEST', None))
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
import inspect import inspect
import unittest import unittest
from functools import wraps from functools import wraps
from itertools import product
from Products.ERP5Type.tests.utils import LogInterceptor from Products.ERP5Type.tests.utils import LogInterceptor
from Testing import ZopeTestCase from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
...@@ -81,6 +82,14 @@ def registerFailingTransactionManager(*args, **kw): ...@@ -81,6 +82,14 @@ def registerFailingTransactionManager(*args, **kw):
pass pass
dummy_tm()._register() dummy_tm()._register()
class LockOnce(object):
def __init__(self):
self.acquire = threading.Lock().acquire
def release(self):
pass
class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Different variables used for this test # Different variables used for this test
...@@ -149,6 +158,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -149,6 +158,14 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
user = uf.getUserById('seb').__of__(uf) user = uf.getUserById('seb').__of__(uf)
newSecurityManager(None, user) newSecurityManager(None, user)
def ticOnce(self, *args, **kw):
is_running_lock = ActivityTool.is_running_lock
try:
ActivityTool.is_running_lock = LockOnce()
self.portal.portal_activities.tic(*args, **kw)
finally:
ActivityTool.is_running_lock = is_running_lock
@for_each_activity @for_each_activity
def testInvokeAndCancelActivity(self, activity): def testInvokeAndCancelActivity(self, activity):
""" """
...@@ -2453,6 +2470,45 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): ...@@ -2453,6 +2470,45 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_address = self.portal.portal_activities.getServerAddress() activity_address = self.portal.portal_activities.getServerAddress()
self.assertEqual(activity_address, server_address) self.assertEqual(activity_address, server_address)
def test_nodePreference(self):
"""
Test node preference, i.e. 'node' parameter of activate()
An object is activated by 2 different nodes and the 2 messages are
processed by the node that created the newest one:
- without node preference: they're ordered by date
- with node preference: they're executed in reverse order (the
processing node executes its message first even if it's newer)
Correct ordering of queues is also checked, by including scenarios
in which one message is in SQLDict and the other in SQLQueue.
"""
activity_tool = self.portal.portal_activities
o = self.getOrganisation()
node_dict = dict(activity_tool.getNodeDict())
assert len(node_dict) == 1 and '' not in node_dict, node_dict
before = DateTime() - 1
activities = 'SQLDict', 'SQLQueue'
for activities in product(activities, activities):
for node, expected in (None, '21'), ("same", '12'):
o._setTitle('0')
# The dance around getNodeDict is to simulate the creation of
# activities from 2 different nodes. We also change title in 2
# different ways, so that SQLDict does not merge them.
o.activate(activity=activities[0], node=node)._setTitle('1')
activity_tool.getNodeDict = lambda: node_dict
node_dict[''] = ActivityTool.ROLE_PROCESSING
o.activate(activity=activities[1], node=node, at_date=before
)._setProperty('title', '2')
del node_dict['']
activity_tool._p_invalidate()
self.commit()
for title in expected:
self.ticOnce()
self.assertEqual(o.getTitle(), title, (activities, expected))
self.assertFalse(activity_tool.getMessageList())
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestCMFActivity)) suite.addTest(unittest.makeSuite(TestCMFActivity))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment