Commit bb07e0f4 authored by Julien Muchembled's avatar Julien Muchembled Committed by Cédric Le Ninivin

CMFActivity: move most SQL queries from DTML to Python

parent d7b3c627
......@@ -189,9 +189,6 @@ class Queue(object):
elif cached_result:
message_dict[message.uid] = message
def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
return 0
def flush(self, activity_tool, object, **kw):
pass
......@@ -216,14 +213,6 @@ class Queue(object):
def getMessageList(self, activity_tool, processing_node=None,**kw):
return []
def countMessage(self, activity_tool,**kw):
return 0
def countMessageWithTag(self, activity_tool,value):
"""Return the number of messages which match the given tag.
"""
return self.countMessage(activity_tool, tag=value)
# Transaction Management
def prepareQueueMessageList(self, activity_tool, message_list):
# Called to prepare transaction commit for queued messages
......
This diff is collapsed.
......@@ -26,6 +26,7 @@
#
##############################################################################
from Shared.DC.ZRDB.Results import Results
from Products.CMFActivity.ActivityTool import Message
import sys
#from time import time
......@@ -74,8 +75,9 @@ class SQLDict(SQLBase):
message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered]
def getProcessableMessageLoader(self, activity_tool, processing_node):
def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {}
quote = db.string_literal
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path and
......@@ -87,6 +89,8 @@ class SQLDict(SQLBase):
uid = line.uid
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
sql_method_id = " AND method_id = %s AND group_method_id = %s" % (
quote(method_id), quote(line.group_method_id))
m = Message.load(line.message, uid=uid, line=line)
merge_parent = m.activity_kw.get('merge_parent')
try:
......@@ -101,11 +105,14 @@ class SQLDict(SQLBase):
path_list.append(path)
uid_list = []
if path_list:
result = activity_tool.SQLDict_selectParentMessage(
path=path_list,
method_id=method_id,
group_method_id=line.group_method_id,
processing_node=processing_node)
# Select parent messages.
result = Results(db.query("SELECT * FROM message"
" WHERE processing_node IN (0, %s) AND path IN (%s)%s"
" ORDER BY path LIMIT 1 FOR UPDATE" % (
processing_node,
','.join(map(quote, path_list)),
sql_method_id,
), 0))
if result: # found a parent
# mark child as duplicate
uid_list.append(uid)
......@@ -115,29 +122,34 @@ class SQLDict(SQLBase):
uid = line.uid
m = Message.load(line.message, uid=uid, line=line)
# return unreserved similar children
result = activity_tool.SQLDict_selectChildMessageList(
path=line.path,
method_id=method_id,
group_method_id=line.group_method_id)
reserve_uid_list = [x.uid for x in result]
path = line.path
result = db.query("SELECT uid FROM message"
" WHERE processing_node = 0 AND (path = %s OR path LIKE %s)"
"%s FOR UPDATE" % (
quote(path), quote(path.replace('_', r'\_') + '/%'),
sql_method_id,
), 0)[1]
reserve_uid_list = [x for x, in result]
uid_list += reserve_uid_list
if not line.processing_node:
# reserve found parent
reserve_uid_list.append(uid)
else:
result = activity_tool.SQLDict_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id)
reserve_uid_list = uid_list = [x.uid for x in result]
# Select duplicates.
result = db.query("SELECT uid FROM message"
" WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % (
quote(path), sql_method_id,
), 0)[1]
reserve_uid_list = uid_list = [x for x, in result]
if reserve_uid_list:
activity_tool.SQLDict_reserveDuplicatedLineList(
processing_node=processing_node, uid=reserve_uid_list)
else:
activity_tool.SQLDict_commit() # release locks
db.query(
"UPDATE message SET processing_node=%s WHERE uid IN (%s)" % (
processing_node, ','.join(map(str, reserve_uid_list)),
))
db.query("COMMIT")
except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception')
activity_tool.SQLDict_rollback() # release locks
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
......@@ -45,25 +45,7 @@ class SQLJoblib(SQLDict):
sql_table = 'message_job'
uid_group = 'portal_activity_job'
def initialize(self, activity_tool, clear):
"""
Initialize the message table using MYISAM Engine
"""
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = folder.SQLJoblib_createMessageTable
except AttributeError:
return
if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table)
createMessageTable()
else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1,
initialize=self._initialize,
table=self.sql_table)
if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
_createMessageTable = 'SQLJoblib_createMessageTable'
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
......@@ -114,8 +96,9 @@ class SQLJoblib(SQLDict):
else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
def getProcessableMessageLoader(self, activity_tool, processing_node):
def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {}
quote = db.string_literal
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path, method_id
......@@ -128,19 +111,23 @@ class SQLJoblib(SQLDict):
if original_uid is None:
m = Message.load(line.message, uid=uid, line=line)
try:
result = activity_tool.SQLJoblib_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id,
signature=line.signature)
reserve_uid_list = uid_list = [x.uid for x in result]
if reserve_uid_list:
activity_tool.SQLBase_reserveMessageList(
table=self.sql_table,
processing_node=processing_node,
uid=reserve_uid_list)
# Select duplicates.
result = db.query("SELECT uid FROM message_job"
" WHERE processing_node = 0 AND path = %s AND signature = %s"
" AND method_id = %s AND group_method_id = %s FOR UPDATE" % (
quote(path), quote(line.signature),
quote(method_id), quote(line.group_method_id),
), 0)[1]
uid_list = [x for x, in result]
if uid_list:
db.query(
"UPDATE message_job SET processing_node=%s WHERE uid IN (%s)" % (
processing_node, ','.join(map(str, uid_list)),
))
db.query("COMMIT")
except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception')
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
......@@ -655,11 +655,6 @@ class ActivityTool (BaseTool):
activity_timing_log = False
cancel_and_invoke_links_hidden = False
def SQLDict_setPriority(self, **kw):
real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
return real_SQLDict_setPriority(**kw)
# Filter content (ZMI))
def filtered_meta_types(self, user=None):
# Filters the list of available meta types.
......@@ -670,6 +665,9 @@ class ActivityTool (BaseTool):
meta_types.append(meta_type)
return meta_types
def getSQLConnection(self):
return self.aq_inner.aq_parent.cmf_activity_sql_connection()
def maybeMigrateConnectionClass(self):
connection_id = 'cmf_activity_sql_connection'
sql_connection = getattr(self, connection_id, None)
......@@ -1127,14 +1125,16 @@ class ActivityTool (BaseTool):
def hasActivity(self, *args, **kw):
# Check in each queue if the object has deferred tasks
# if not argument is provided, then check on self
if len(args) > 0:
obj = args[0]
if args:
obj, = args
else:
obj = self
for activity in activity_dict.itervalues():
if activity.hasActivity(aq_inner(self), obj, **kw):
return True
return False
path = None if obj is None else '/'.join(obj.getPhysicalPath())
db = self.getSQLConnection()
quote = db.string_literal
return bool(db.query("(%s)" % ") UNION ALL (".join(
activity.hasActivitySQL(quote, path=path, **kw)
for activity in activity_dict.itervalues()))[1])
security.declarePrivate('getActivityBuffer')
def getActivityBuffer(self, create_if_not_found=True):
......@@ -1443,8 +1443,9 @@ class ActivityTool (BaseTool):
"""
if not(isinstance(message_uid_list, list)):
message_uid_list = [message_uid_list]
self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table,
uid=message_uid_list)
if message_uid_list:
activity_dict[activity].unreserveMessageList(self.getSQLConnection(),
0, message_uid_list)
if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (
self.absolute_url(), 'view'))
......@@ -1470,8 +1471,8 @@ class ActivityTool (BaseTool):
"""
if not(isinstance(message_uid_list, list)):
message_uid_list = [message_uid_list]
self.SQLBase_delMessage(table=activity_dict[activity].sql_table,
uid=message_uid_list)
activity_dict[activity].deleteMessageList(
self.getSQLConnection(), message_uid_list)
if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (
self.absolute_url(), 'view'))
......@@ -1523,10 +1524,7 @@ class ActivityTool (BaseTool):
"""
Return the number of messages which match the given tag.
"""
message_count = 0
for activity in activity_dict.itervalues():
message_count += activity.countMessageWithTag(aq_inner(self), value)
return message_count
return self.countMessage(tag=value)
security.declarePublic('countMessage')
def countMessage(self, **kw):
......@@ -1540,10 +1538,11 @@ class ActivityTool (BaseTool):
tag : activities with a particular tag
message_uid : activities with a particular uid
"""
message_count = 0
for activity in activity_dict.itervalues():
message_count += activity.countMessage(aq_inner(self), **kw)
return message_count
db = self.getSQLConnection()
quote = db.string_literal
return sum(x for x, in db.query("(%s)" % ") UNION ALL (".join(
activity.countMessageSQL(quote, **kw)
for activity in activity_dict.itervalues()))[1])
security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
def newActiveProcess(self, REQUEST=None, **kw):
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
uid:list
</params>
UPDATE
<dtml-var table>
SET
processing_node=<dtml-sqlvar processing_node type="int">,
processing=0
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid:list
</params>
DELETE FROM
<dtml-var table>
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table</params>
DROP TABLE IF EXISTS <dtml-var table>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
SELECT UTC_TIMESTAMP(6)
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
</params>
SELECT `priority`, `date` FROM
<dtml-var table>
WHERE
processing_node = 0
AND date <= UTC_TIMESTAMP(6)
ORDER BY priority, date
LIMIT 1
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
to_date
count
group_method_id
</params>
SELECT
*
FROM
<dtml-var table>
WHERE
processing_node=0
AND date <= <dtml-sqlvar to_date type="datetime(6)">
<dtml-if expr="group_method_id is not None">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
</dtml-if>
ORDER BY
<dtml-comment>
During normal operation, sorting by date (as 2nd criteria) is fairer
for users and reduce the probability to do the same work several times
(think of an object that is modified several times in a short period of time).
</dtml-comment>
priority, date
LIMIT <dtml-sqlvar count type="int">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
path
method_id
active_process_uid
only_valid
only_invalid</params>
SELECT count(path) as message_count FROM
<dtml-var table>
WHERE 1 = 1
<dtml-if expr="path is not None">AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if expr="method_id is not None">AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
<dtml-if expr="only_valid">AND processing_node > -2</dtml-if>
<dtml-if expr="only_invalid">AND processing_node < -1</dtml-if>
<dtml-if expr="active_process_uid is not None"> AND active_process_uid = <dtml-sqlvar active_process_uid type="int"> </dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid</params>
UPDATE
<dtml-var table>
SET
processing_node=0,
processing=0
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid</params>
UPDATE
<dtml-var table>
SET
processing_date = UTC_TIMESTAMP(6),
processing = 1
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:100
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
uid:list
retry
delay
</params>
UPDATE
<dtml-var table>
SET
date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL
<dtml-sqlvar delay type="int"> SECOND)
<dtml-if expr="retry is not None">
, priority = priority + <dtml-sqlvar retry type="int">
, retry = retry + <dtml-sqlvar retry type="int">
</dtml-if>
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
uid
</params>
UPDATE
<dtml-var table>
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
processing_node
group_method_id
count</params>
SELECT
*
FROM
<dtml-var table>
WHERE
processing_node = <dtml-sqlvar processing_node type="int">
<dtml-if expr="group_method_id is not None">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
</dtml-if>
<dtml-if expr="count is not None">
LIMIT <dtml-sqlvar count type="int">
</dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>table
delay
processing_node</params>
UPDATE
<dtml-var table>
SET
date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND),
processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND)
<dtml-if expr="processing_node is not None">
WHERE <dtml-sqltest processing_node type="int">
</dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
processing_node
uid
</params>
UPDATE
message
SET
processing_node=<dtml-sqlvar processing_node type="int">
WHERE
<dtml-sqltest uid type="int" multiple>
<dtml-var sql_delimiter>
COMMIT
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
ROLLBACK
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
</params>
SELECT uid FROM
message
WHERE
processing_node = 0
AND (path = <dtml-sqlvar path type="string">
OR path LIKE <dtml-sqlvar type="string"
expr="path.replace('_', r'\_') + '/%'">)
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
</params>
SELECT uid FROM
message
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
processing_node
</params>
SELECT * FROM
message
WHERE
processing_node IN (0, <dtml-sqlvar processing_node type="int">)
AND <dtml-sqltest path type="string" multiple>
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
ORDER BY path
LIMIT 1
FOR UPDATE
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
path
method_id
group_method_id
signature
</params>
SELECT uid FROM
message_job
WHERE
processing_node = 0
AND path = <dtml-sqlvar path type="string">
AND method_id = <dtml-sqlvar method_id type="string">
AND group_method_id = <dtml-sqlvar group_method_id type="string">
AND signature = <dtml-sqlvar signature type="string">
FOR UPDATE
......@@ -2184,7 +2184,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity.getProcessableMessageList(activity_tool, 3)
self.commit()
result = activity._getMessageList(activity_tool)
result = activity._getMessageList(activity_tool.getSQLConnection())
try:
self.assertEqual(len([message
for message in result
......@@ -2205,8 +2205,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
1)
finally:
# Clear activities from all nodes
activity_tool.SQLBase_delMessage(table=SQLDict.sql_table,
uid=[message.uid for message in result])
activity.deleteMessageList(activity_tool.getSQLConnection(),
[message.uid for message in result])
self.commit()
def test_116_RaiseInCommitBeforeMessageExecution(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment