diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index c1d2dc083e6c3ff9c509bc90cdb630bdda5a85f0..d73fdc5c7efb196d813fe1668a50525f9b1da1f8 100644 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -189,9 +189,6 @@ class Queue(object): elif cached_result: message_dict[message.uid] = message - def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw): - return 0 - def flush(self, activity_tool, object, **kw): pass @@ -216,14 +213,6 @@ class Queue(object): def getMessageList(self, activity_tool, processing_node=None,**kw): return [] - def countMessage(self, activity_tool,**kw): - return 0 - - def countMessageWithTag(self, activity_tool,value): - """Return the number of messages which match the given tag. - """ - return self.countMessage(activity_tool, tag=value) - # Transaction Management def prepareQueueMessageList(self, activity_tool, message_list): # Called to prepare transaction commit for queued messages diff --git a/product/CMFActivity/Activity/SQLBase.py b/product/CMFActivity/Activity/SQLBase.py index 78fdabf43c0e3177f940a98402f0fb6794ed7bad..34c7de9c514ff3370198314b865d54f504776256 100644 --- a/product/CMFActivity/Activity/SQLBase.py +++ b/product/CMFActivity/Activity/SQLBase.py @@ -115,18 +115,29 @@ def sqltest_dict(): return sqltest_dict sqltest_dict = sqltest_dict() +def getNow(db): + """ + Return the UTC date from the point of view of the SQL server. + Note that this value is not cached, and is not transactionnal on MySQL + side. + """ + return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0] + class SQLBase(Queue): """ Define a set of common methods for SQL-based storage of activities. """ + _createMessageTable = 'SQLBase_createMessageTable' + def initialize(self, activity_tool, clear): folder = activity_tool.getPortalObject().portal_skins.activity try: - createMessageTable = folder.SQLBase_createMessageTable + createMessageTable = getattr(folder, self._createMessageTable) except AttributeError: return if clear: - folder.SQLBase_dropMessageTable(table=self.sql_table) + activity_tool.getSQLConnection().query( + "DROP TABLE IF EXISTS " + self.sql_table) createMessageTable(table=self.sql_table) else: src = createMessageTable._upgradeSchema(create_if_not_exists=1, @@ -185,33 +196,21 @@ class SQLBase(Queue): else: raise ValueError("Maximum retry for SQLBase_writeMessageList reached") - def getNow(self, context): - """ - Return the current value for SQL server's NOW(). - Note that this value is not cached, and is not transactionnal on MySQL - side. - """ - result = context.SQLBase_getNow() - assert len(result) == 1 - assert len(result[0]) == 1 - return result[0][0] - - def _getMessageList(self, activity_tool, count=1000, src__=0, **kw): + def _getMessageList(self, db, count=1000, src__=0, **kw): # XXX: Because most columns have NOT NULL constraint, conditions with None # value should be ignored, instead of trying to render them # (with comparisons with NULL). - sql_connection = activity_tool.getPortalObject().cmf_activity_sql_connection - q = sql_connection.sql_quote__ + q = db.string_literal sql = '\n AND '.join(sqltest_dict[k](v, q) for k, v in kw.iteritems()) sql = "SELECT * FROM %s%s\nORDER BY priority, date, uid%s" % ( self.sql_table, sql and '\nWHERE ' + sql, '' if count is None else '\nLIMIT %d' % count, ) - return sql if src__ else Results(sql_connection().query(sql, max_rows=0)) + return sql if src__ else Results(db.query(sql, max_rows=0)) - def getMessageList(self, *args, **kw): - result = self._getMessageList(*args, **kw) + def getMessageList(self, activity_tool, *args, **kw): + result = self._getMessageList(activity_tool.getSQLConnection(), *args, **kw) if type(result) is str: # src__ == 1 return result, class_name = self.__class__.__name__ @@ -223,57 +222,27 @@ class SQLBase(Queue): processing=line.processing) for line in result] - def countMessage(self, activity_tool, tag=None, path=None, - method_id=None, message_uid=None, **kw): - """Return the number of messages which match the given parameters. - """ - if isinstance(tag, str): - tag = [tag] - if isinstance(path, str): - path = [path] - if isinstance(method_id, str): - method_id = [method_id] - result = activity_tool.SQLBase_validateMessageList(table=self.sql_table, - method_id=method_id, - path=path, - message_uid=message_uid, - tag=tag, - serialization_tag=None, - count=1) - return result[0].uid_count - - def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, - active_process_uid=None, - only_invalid=False): - hasMessage = getattr(activity_tool, 'SQLBase_hasMessage', None) - if hasMessage is not None: - if object is None: - path = None - else: - path = '/'.join(object.getPhysicalPath()) - try: - result = hasMessage(table=self.sql_table, path=path, method_id=method_id, - only_valid=only_valid, active_process_uid=active_process_uid, - only_invalid=only_invalid) - except DatabaseError: - LOG( - 'SQLBase', - ERROR, - '%r raised, considering there are no activities' % ( - hasMessage, - ), - error=True, - ) - else: - return result[0].message_count > 0 - return 0 + def countMessageSQL(self, quote, **kw): + return "SELECT count(*) FROM %s WHERE processing_node > -10 AND %s" % ( + self.sql_table, " AND ".join( + sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v + ) or "1") + + def hasActivitySQL(self, quote, only_valid=False, only_invalid=False, **kw): + where = [sqltest_dict[k](v, quote) for (k, v) in kw.iteritems() if v] + if only_valid: + where.append('processing_node > -2') + if only_invalid: + where.append('processing_node < -1') + return "SELECT 1 FROM %s WHERE %s LIMIT 1" % ( + self.sql_table, " AND ".join(where) or "1") def getPriority(self, activity_tool): - result = activity_tool.SQLBase_getPriority(table=self.sql_table) - if result: - result, = result - return result['priority'], result['date'] - return Queue.getPriority(self, activity_tool) + result = activity_tool.getSQLConnection().query( + "SELECT priority, date FROM %s" + " WHERE processing_node=0 AND date <= UTC_TIMESTAMP(6)" + " ORDER BY priority, date LIMIT 1" % self.sql_table, 0)[1] + return result[0] if result else Queue.getPriority(self, activity_tool) def _retryOnLockError(self, method, args=(), kw={}): while True: @@ -349,10 +318,8 @@ class SQLBase(Queue): error=severity>INFO and sys.exc_info() or None) def distribute(self, activity_tool, node_count): - assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None) - if assignMessage is None: - return - now_date = self.getNow(activity_tool) + db = activity_tool.getSQLConnection() + now_date = getNow(db) where_kw = { 'processing_node': -1, 'to_date': now_date, @@ -360,7 +327,7 @@ class SQLBase(Queue): } validated_count = 0 while 1: - result = self._getMessageList(activity_tool, **where_kw) + result = self._getMessageList(db, **where_kw) if not result: return transaction.commit() @@ -395,8 +362,7 @@ class SQLBase(Queue): distributable_uid_set.add(message.uid) distributable_count = len(distributable_uid_set) if distributable_count: - assignMessage(table=self.sql_table, - processing_node=0, uid=tuple(distributable_uid_set)) + self.unreserveMessageList(db, 0, distributable_uid_set) validated_count += distributable_count if validated_count >= MAX_VALIDATED_LIMIT: return @@ -404,7 +370,7 @@ class SQLBase(Queue): where_kw['from_date'] = line.date where_kw['above_uid'] = line.uid - def getReservedMessageList(self, activity_tool, date, processing_node, + def getReservedMessageList(self, db, date, processing_node, limit=None, group_method_id=None): """ Get and reserve a list of messages. @@ -418,29 +384,34 @@ class SQLBase(Queue): If None (or not given) no limit apply. """ assert limit + quote = db.string_literal + query = db.query + sql_group = ('' if group_method_id is None else + ' AND group_method_id=' + quote(group_method_id)) + + # Select reserved messages. # Do not check already-assigned messages when trying to reserve more # activities, because in such case we will find one reserved activity. - result = activity_tool.SQLBase_selectReservedMessageList( - table=self.sql_table, - count=limit, - processing_node=processing_node, - group_method_id=group_method_id, - ) + result = Results(query( + "SELECT * FROM %s WHERE processing_node=%s%s LIMIT %s" % ( + self.sql_table, processing_node, sql_group, limit), 0)) limit -= len(result) if limit: - reservable = activity_tool.SQLBase_getReservableMessageList( - table=self.sql_table, - count=limit, - processing_node=processing_node, - to_date=date, - group_method_id=group_method_id, - ) + # Get reservable messages. + # During normal operation, sorting by date (as last criteria) is fairer + # for users and reduce the probability to do the same work several times + # (think of an object that is modified several times in a short period of + # time). + reservable = Results(query( + "SELECT * FROM %s WHERE processing_node=0 AND %s%s" + " ORDER BY priority, date LIMIT %s FOR UPDATE" % ( + self.sql_table, sqltest_dict['to_date'](date, quote), sql_group, + limit), 0)) if reservable: - activity_tool.SQLBase_reserveMessageList( - uid=[x.uid for x in reservable], - table=self.sql_table, - processing_node=processing_node, - ) + # Reserve messages. + query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % ( + self.sql_table, processing_node, + ','.join(str(x.uid) for x in reservable))) # DC.ZRDB.Results.Results does not implement concatenation # Implement an imperfect (but cheap) concatenation. Do not update # __items__ nor _data_dictionary. @@ -449,15 +420,15 @@ class SQLBase(Queue): result._data += reservable._data return result - def makeMessageListAvailable(self, activity_tool, uid_list): + def unreserveMessageList(self, db, state, uid_list): """ - Put messages back in processing_node=0 . + Put messages back in given processing_node. """ - if len(uid_list): - activity_tool.SQLBase_makeMessageListAvailable(table=self.sql_table, - uid=uid_list) + db.query( + "UPDATE %s SET processing_node=%s, processing=0 WHERE uid IN (%s)\0" + "COMMIT" % (self.sql_table, state, ','.join(map(str, uid_list)))) - def getProcessableMessageLoader(self, activity_tool, processing_node): + def getProcessableMessageLoader(self, db, processing_node): # do not merge anything def load(line): uid = line.uid @@ -494,8 +465,9 @@ class SQLBase(Queue): - group_method_id - uid_to_duplicate_uid_list_dict """ + db = activity_tool.getSQLConnection() def getReservedMessageList(limit, group_method_id=None): - line_list = self.getReservedMessageList(activity_tool=activity_tool, + line_list = self.getReservedMessageList(db, date=now_date, processing_node=processing_node, limit=limit, @@ -503,12 +475,12 @@ class SQLBase(Queue): if line_list: self._log(TRACE, 'Reserved messages: %r' % [x.uid for x in line_list]) return line_list - now_date = self.getNow(activity_tool) + now_date = getNow(db) uid_to_duplicate_uid_list_dict = {} try: result = getReservedMessageList(1) if result: - load = self.getProcessableMessageLoader(activity_tool, processing_node) + load = self.getProcessableMessageLoader(db, processing_node) m, uid, uid_list = load(result[0]) message_list = [m] uid_to_duplicate_uid_list_dict[uid] = uid_list @@ -538,10 +510,14 @@ class SQLBase(Queue): message_list.append(m) if cost >= 1: # Unreserve extra messages as soon as possible. - self.makeMessageListAvailable(activity_tool=activity_tool, - uid_list=[line.uid for line in result if line.uid != uid]) - activity_tool.SQLBase_processMessage(table=self.sql_table, - uid=uid_to_duplicate_uid_list_dict.keys()) + uid_list = [line.uid for line in result if line.uid != uid] + if uid_list: + self.unreserveMessageList(db, 0, uid_list) + # Process messages. + db.query("UPDATE %s" + " SET processing=1, processing_date=UTC_TIMESTAMP(6)" + " WHERE uid IN (%s)\0COMMIT" % ( + self.sql_table, ','.join(map(str, uid_to_duplicate_uid_list_dict)))) return message_list, group_method_id, uid_to_duplicate_uid_list_dict except: self._log(WARNING, 'Exception while reserving messages.') @@ -550,8 +526,7 @@ class SQLBase(Queue): for uid_list in uid_to_duplicate_uid_list_dict.itervalues(): to_free_uid_list += uid_list try: - self.makeMessageListAvailable(activity_tool=activity_tool, - uid_list=to_free_uid_list) + self.unreserveMessageList(db, 0, to_free_uid_list) except: self._log(ERROR, 'Failed to free messages: %r' % to_free_uid_list) else: @@ -636,6 +611,18 @@ class SQLBase(Queue): transaction.commit() return not message_list + def deleteMessageList(self, db, uid_list): + db.query("DELETE FROM %s WHERE uid IN (%s)" % ( + self.sql_table, ','.join(map(str, uid_list)))) + + def reactivateMessageList(self, db, uid_list, delay, retry): + db.query("UPDATE %s SET" + " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)" + "%s WHERE uid IN (%s)" % ( + self.sql_table, delay, + ", priority = priority + 1, retry = retry + 1" if retry else "", + ",".join(map(str, uid_list)))) + def finalizeMessageExecution(self, activity_tool, message_list, uid_to_duplicate_uid_list_dict=None): """ @@ -648,6 +635,7 @@ class SQLBase(Queue): be put in a permanent-error state. - In all other cases, retry count is increased and message is delayed. """ + db = activity_tool.getSQLConnection() deletable_uid_list = [] delay_uid_list = [] final_error_uid_list = [] @@ -692,10 +680,7 @@ class SQLBase(Queue): delay = VALIDATION_ERROR_DELAY * (retry * retry + 1) * 2 try: # Immediately update, because values different for every message - activity_tool.SQLBase_reactivate(table=self.sql_table, - uid=[uid], - delay=delay, - retry=1) + self.reactivateMessageList(db, (uid,), delay, True) except: self._log(WARNING, 'Failed to reactivate %r' % uid) make_available_uid_list.append(uid) @@ -709,9 +694,7 @@ class SQLBase(Queue): deletable_uid_list.append(uid) if deletable_uid_list: try: - self._retryOnLockError(activity_tool.SQLBase_delMessage, - kw={'table': self.sql_table, - 'uid': deletable_uid_list}) + self._retryOnLockError(self.deleteMessageList, (db, deletable_uid_list)) except: self._log(ERROR, 'Failed to delete messages %r' % deletable_uid_list) else: @@ -719,21 +702,19 @@ class SQLBase(Queue): if delay_uid_list: try: # If this is a conflict error, do not increase 'retry' but only delay. - activity_tool.SQLBase_reactivate(table=self.sql_table, - uid=delay_uid_list, delay=VALIDATION_ERROR_DELAY, retry=None) + self.reactivateMessageList(db, delay_uid_list, + VALIDATION_ERROR_DELAY, False) except: self._log(ERROR, 'Failed to delay %r' % delay_uid_list) if final_error_uid_list: try: - activity_tool.SQLBase_assignMessage(table=self.sql_table, - uid=final_error_uid_list, processing_node=INVOKE_ERROR_STATE) + self.unreserveMessageList(db, INVOKE_ERROR_STATE, final_error_uid_list) except: self._log(ERROR, 'Failed to set message to error state for %r' % final_error_uid_list) if make_available_uid_list: try: - self.makeMessageListAvailable(activity_tool=activity_tool, - uid_list=make_available_uid_list) + self.unreserveMessageList(db, 0, make_available_uid_list) except: self._log(ERROR, 'Failed to unreserve %r' % make_available_uid_list) else: @@ -783,13 +764,14 @@ class SQLBase(Queue): invoke(m) activity_tool.unregisterMessage(self, m) uid_list = [] - for line in self._getMessageList(activity_tool, path=path, processing=0, + db = activity_tool.getSQLConnection() + for line in self._getMessageList(db, path=path, processing=0, **({'method_id': method_id} if method_id else {})): uid_list.append(line.uid) if invoke: invoke(Message.load(line.message, uid=line.uid, line=line)) if uid_list: - activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list) + self.deleteMessageList(db, uid_list) # Required for tests def timeShift(self, activity_tool, delay, processing_node=None): @@ -797,5 +779,9 @@ class SQLBase(Queue): To simulate time shift, we simply substract delay from all dates in message(_queue) table """ - activity_tool.SQLBase_timeShift(table=self.sql_table, delay=delay, - processing_node=processing_node) + activity_tool.getSQLConnection().query("UPDATE %s SET" + " date = DATE_SUB(date, INTERVAL %s SECOND)," + " processing_date = DATE_SUB(processing_date, INTERVAL %s SECOND)" + % (self.sql_table, delay, delay) + + ('' if processing_node is None else + "WHERE processing_node=%s" % processing_node)) diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index bcd8cb979492de39231d80f8f9393a7bf104871d..98f224df9b42dda44889ff76147037dec66b9624 100644 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -26,6 +26,7 @@ # ############################################################################## +from Shared.DC.ZRDB.Results import Results from Products.CMFActivity.ActivityTool import Message import sys #from time import time @@ -74,8 +75,9 @@ class SQLDict(SQLBase): message_list = activity_buffer.getMessageList(self) return [m for m in message_list if m.is_registered] - def getProcessableMessageLoader(self, activity_tool, processing_node): + def getProcessableMessageLoader(self, db, processing_node): path_and_method_id_dict = {} + quote = db.string_literal def load(line): # getProcessableMessageList already fetch messages with the same # group_method_id, so what remains to be filtered on are path and @@ -87,6 +89,8 @@ class SQLDict(SQLBase): uid = line.uid original_uid = path_and_method_id_dict.get(key) if original_uid is None: + sql_method_id = " AND method_id = %s AND group_method_id = %s" % ( + quote(method_id), quote(line.group_method_id)) m = Message.load(line.message, uid=uid, line=line) merge_parent = m.activity_kw.get('merge_parent') try: @@ -101,11 +105,14 @@ class SQLDict(SQLBase): path_list.append(path) uid_list = [] if path_list: - result = activity_tool.SQLDict_selectParentMessage( - path=path_list, - method_id=method_id, - group_method_id=line.group_method_id, - processing_node=processing_node) + # Select parent messages. + result = Results(db.query("SELECT * FROM message" + " WHERE processing_node IN (0, %s) AND path IN (%s)%s" + " ORDER BY path LIMIT 1 FOR UPDATE" % ( + processing_node, + ','.join(map(quote, path_list)), + sql_method_id, + ), 0)) if result: # found a parent # mark child as duplicate uid_list.append(uid) @@ -115,29 +122,34 @@ class SQLDict(SQLBase): uid = line.uid m = Message.load(line.message, uid=uid, line=line) # return unreserved similar children - result = activity_tool.SQLDict_selectChildMessageList( - path=line.path, - method_id=method_id, - group_method_id=line.group_method_id) - reserve_uid_list = [x.uid for x in result] + path = line.path + result = db.query("SELECT uid FROM message" + " WHERE processing_node = 0 AND (path = %s OR path LIKE %s)" + "%s FOR UPDATE" % ( + quote(path), quote(path.replace('_', r'\_') + '/%'), + sql_method_id, + ), 0)[1] + reserve_uid_list = [x for x, in result] uid_list += reserve_uid_list if not line.processing_node: # reserve found parent reserve_uid_list.append(uid) else: - result = activity_tool.SQLDict_selectDuplicatedLineList( - path=path, - method_id=method_id, - group_method_id=line.group_method_id) - reserve_uid_list = uid_list = [x.uid for x in result] + # Select duplicates. + result = db.query("SELECT uid FROM message" + " WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % ( + quote(path), sql_method_id, + ), 0)[1] + reserve_uid_list = uid_list = [x for x, in result] if reserve_uid_list: - activity_tool.SQLDict_reserveDuplicatedLineList( - processing_node=processing_node, uid=reserve_uid_list) - else: - activity_tool.SQLDict_commit() # release locks + db.query( + "UPDATE message SET processing_node=%s WHERE uid IN (%s)" % ( + processing_node, ','.join(map(str, reserve_uid_list)), + )) + db.query("COMMIT") except: - self._log(WARNING, 'getDuplicateMessageUidList got an exception') - activity_tool.SQLDict_rollback() # release locks + self._log(WARNING, 'Failed to reserve duplicates') + db.query("ROLLBACK") raise if uid_list: self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list) diff --git a/product/CMFActivity/Activity/SQLJoblib.py b/product/CMFActivity/Activity/SQLJoblib.py index cfdedd0bab23708ccfc9e3fe94e1517f384204ca..238f1b0f19e6c0b22d27aec2548b23446df01aa2 100644 --- a/product/CMFActivity/Activity/SQLJoblib.py +++ b/product/CMFActivity/Activity/SQLJoblib.py @@ -45,25 +45,7 @@ class SQLJoblib(SQLDict): sql_table = 'message_job' uid_group = 'portal_activity_job' - def initialize(self, activity_tool, clear): - """ - Initialize the message table using MYISAM Engine - """ - folder = activity_tool.getPortalObject().portal_skins.activity - try: - createMessageTable = folder.SQLJoblib_createMessageTable - except AttributeError: - return - if clear: - folder.SQLBase_dropMessageTable(table=self.sql_table) - createMessageTable() - else: - src = createMessageTable._upgradeSchema(create_if_not_exists=1, - initialize=self._initialize, - table=self.sql_table) - if src: - LOG('CMFActivity', INFO, "%r table upgraded\n%s" - % (self.sql_table, src)) + _createMessageTable = 'SQLJoblib_createMessageTable' def generateMessageUID(self, m): return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'), @@ -114,8 +96,9 @@ class SQLJoblib(SQLDict): else: raise ValueError("Maximum retry for SQLBase_writeMessageList reached") - def getProcessableMessageLoader(self, activity_tool, processing_node): + def getProcessableMessageLoader(self, db, processing_node): path_and_method_id_dict = {} + quote = db.string_literal def load(line): # getProcessableMessageList already fetch messages with the same # group_method_id, so what remains to be filtered on are path, method_id @@ -128,19 +111,23 @@ class SQLJoblib(SQLDict): if original_uid is None: m = Message.load(line.message, uid=uid, line=line) try: - result = activity_tool.SQLJoblib_selectDuplicatedLineList( - path=path, - method_id=method_id, - group_method_id=line.group_method_id, - signature=line.signature) - reserve_uid_list = uid_list = [x.uid for x in result] - if reserve_uid_list: - activity_tool.SQLBase_reserveMessageList( - table=self.sql_table, - processing_node=processing_node, - uid=reserve_uid_list) + # Select duplicates. + result = db.query("SELECT uid FROM message_job" + " WHERE processing_node = 0 AND path = %s AND signature = %s" + " AND method_id = %s AND group_method_id = %s FOR UPDATE" % ( + quote(path), quote(line.signature), + quote(method_id), quote(line.group_method_id), + ), 0)[1] + uid_list = [x for x, in result] + if uid_list: + db.query( + "UPDATE message_job SET processing_node=%s WHERE uid IN (%s)" % ( + processing_node, ','.join(map(str, uid_list)), + )) + db.query("COMMIT") except: - self._log(WARNING, 'getDuplicateMessageUidList got an exception') + self._log(WARNING, 'Failed to reserve duplicates') + db.query("ROLLBACK") raise if uid_list: self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list) diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 3a4aaee08df307c91496fe4f3100be29186ff4a5..52c286965cf79b82613b58bb9a4b41306c4e7be2 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -655,11 +655,6 @@ class ActivityTool (BaseTool): activity_timing_log = False cancel_and_invoke_links_hidden = False - def SQLDict_setPriority(self, **kw): - real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority') - LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw)) - return real_SQLDict_setPriority(**kw) - # Filter content (ZMI)) def filtered_meta_types(self, user=None): # Filters the list of available meta types. @@ -670,6 +665,9 @@ class ActivityTool (BaseTool): meta_types.append(meta_type) return meta_types + def getSQLConnection(self): + return self.aq_inner.aq_parent.cmf_activity_sql_connection() + def maybeMigrateConnectionClass(self): connection_id = 'cmf_activity_sql_connection' sql_connection = getattr(self, connection_id, None) @@ -1127,14 +1125,16 @@ class ActivityTool (BaseTool): def hasActivity(self, *args, **kw): # Check in each queue if the object has deferred tasks # if not argument is provided, then check on self - if len(args) > 0: - obj = args[0] + if args: + obj, = args else: obj = self - for activity in activity_dict.itervalues(): - if activity.hasActivity(aq_inner(self), obj, **kw): - return True - return False + path = None if obj is None else '/'.join(obj.getPhysicalPath()) + db = self.getSQLConnection() + quote = db.string_literal + return bool(db.query("(%s)" % ") UNION ALL (".join( + activity.hasActivitySQL(quote, path=path, **kw) + for activity in activity_dict.itervalues()))[1]) security.declarePrivate('getActivityBuffer') def getActivityBuffer(self, create_if_not_found=True): @@ -1443,8 +1443,9 @@ class ActivityTool (BaseTool): """ if not(isinstance(message_uid_list, list)): message_uid_list = [message_uid_list] - self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table, - uid=message_uid_list) + if message_uid_list: + activity_dict[activity].unreserveMessageList(self.getSQLConnection(), + 0, message_uid_list) if REQUEST is not None: return REQUEST.RESPONSE.redirect('%s/%s' % ( self.absolute_url(), 'view')) @@ -1470,8 +1471,8 @@ class ActivityTool (BaseTool): """ if not(isinstance(message_uid_list, list)): message_uid_list = [message_uid_list] - self.SQLBase_delMessage(table=activity_dict[activity].sql_table, - uid=message_uid_list) + activity_dict[activity].deleteMessageList( + self.getSQLConnection(), message_uid_list) if REQUEST is not None: return REQUEST.RESPONSE.redirect('%s/%s' % ( self.absolute_url(), 'view')) @@ -1523,10 +1524,7 @@ class ActivityTool (BaseTool): """ Return the number of messages which match the given tag. """ - message_count = 0 - for activity in activity_dict.itervalues(): - message_count += activity.countMessageWithTag(aq_inner(self), value) - return message_count + return self.countMessage(tag=value) security.declarePublic('countMessage') def countMessage(self, **kw): @@ -1540,10 +1538,11 @@ class ActivityTool (BaseTool): tag : activities with a particular tag message_uid : activities with a particular uid """ - message_count = 0 - for activity in activity_dict.itervalues(): - message_count += activity.countMessage(aq_inner(self), **kw) - return message_count + db = self.getSQLConnection() + quote = db.string_literal + return sum(x for x, in db.query("(%s)" % ") UNION ALL (".join( + activity.countMessageSQL(quote, **kw) + for activity in activity_dict.itervalues()))[1]) security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' ) def newActiveProcess(self, REQUEST=None, **kw): diff --git a/product/CMFActivity/skins/activity/SQLBase_assignMessage.zsql b/product/CMFActivity/skins/activity/SQLBase_assignMessage.zsql deleted file mode 100644 index 4173872ad8add0c982c09e8359cdef373990f401..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_assignMessage.zsql +++ /dev/null @@ -1,22 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -processing_node -uid:list -</params> -UPDATE - <dtml-var table> -SET - processing_node=<dtml-sqlvar processing_node type="int">, - processing=0 -WHERE - <dtml-sqltest uid type="int" multiple> -<dtml-var sql_delimiter> -COMMIT diff --git a/product/CMFActivity/skins/activity/SQLBase_delMessage.zsql b/product/CMFActivity/skins/activity/SQLBase_delMessage.zsql deleted file mode 100644 index ae7b97c4cc42efda496efa9e9c56cce9151df43b..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_delMessage.zsql +++ /dev/null @@ -1,16 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1000 -max_cache:100 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -uid:list -</params> -DELETE FROM - <dtml-var table> -WHERE - <dtml-sqltest uid type="int" multiple> diff --git a/product/CMFActivity/skins/activity/SQLBase_dropMessageTable.zsql b/product/CMFActivity/skins/activity/SQLBase_dropMessageTable.zsql deleted file mode 100644 index c6b54f29ab92dcb3a468e7b210461b9c309ef1e5..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_dropMessageTable.zsql +++ /dev/null @@ -1,11 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1000 -max_cache:100 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table</params> -DROP TABLE IF EXISTS <dtml-var table> diff --git a/product/CMFActivity/skins/activity/SQLBase_getNow.zsql b/product/CMFActivity/skins/activity/SQLBase_getNow.zsql deleted file mode 100644 index 9789ea90b9866650c286b2a28698f21e5118476a..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_getNow.zsql +++ /dev/null @@ -1,11 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params></params> -SELECT UTC_TIMESTAMP(6) diff --git a/product/CMFActivity/skins/activity/SQLBase_getPriority.zsql b/product/CMFActivity/skins/activity/SQLBase_getPriority.zsql deleted file mode 100644 index 5cd5be1478902bec7c4a6af6bff98351b224488e..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_getPriority.zsql +++ /dev/null @@ -1,18 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -</params> -SELECT `priority`, `date` FROM - <dtml-var table> -WHERE - processing_node = 0 - AND date <= UTC_TIMESTAMP(6) -ORDER BY priority, date -LIMIT 1 diff --git a/product/CMFActivity/skins/activity/SQLBase_getReservableMessageList.zsql b/product/CMFActivity/skins/activity/SQLBase_getReservableMessageList.zsql deleted file mode 100644 index ebee233c12fb228248f575bd2332c82875584b3e..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_getReservableMessageList.zsql +++ /dev/null @@ -1,34 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -processing_node -to_date -count -group_method_id -</params> -SELECT - * -FROM - <dtml-var table> -WHERE - processing_node=0 - AND date <= <dtml-sqlvar to_date type="datetime(6)"> - <dtml-if expr="group_method_id is not None"> - AND group_method_id = <dtml-sqlvar group_method_id type="string"> - </dtml-if> -ORDER BY -<dtml-comment> - During normal operation, sorting by date (as 2nd criteria) is fairer - for users and reduce the probability to do the same work several times - (think of an object that is modified several times in a short period of time). -</dtml-comment> - priority, date -LIMIT <dtml-sqlvar count type="int"> -FOR UPDATE diff --git a/product/CMFActivity/skins/activity/SQLBase_hasMessage.zsql b/product/CMFActivity/skins/activity/SQLBase_hasMessage.zsql deleted file mode 100644 index 2e6b81c79177650d205c56a79793078ed921f2a1..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_hasMessage.zsql +++ /dev/null @@ -1,23 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -path -method_id -active_process_uid -only_valid -only_invalid</params> -SELECT count(path) as message_count FROM - <dtml-var table> -WHERE 1 = 1 -<dtml-if expr="path is not None">AND path = <dtml-sqlvar path type="string"> </dtml-if> -<dtml-if expr="method_id is not None">AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> -<dtml-if expr="only_valid">AND processing_node > -2</dtml-if> -<dtml-if expr="only_invalid">AND processing_node < -1</dtml-if> -<dtml-if expr="active_process_uid is not None"> AND active_process_uid = <dtml-sqlvar active_process_uid type="int"> </dtml-if> diff --git a/product/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql b/product/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql deleted file mode 100644 index 2ace1255402b52c5a4daa38a5ff4a18a8ac76324..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_makeMessageListAvailable.zsql +++ /dev/null @@ -1,20 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -uid</params> -UPDATE - <dtml-var table> -SET - processing_node=0, - processing=0 -WHERE - <dtml-sqltest uid type="int" multiple> -<dtml-var sql_delimiter> -COMMIT diff --git a/product/CMFActivity/skins/activity/SQLBase_processMessage.zsql b/product/CMFActivity/skins/activity/SQLBase_processMessage.zsql deleted file mode 100644 index 67c0fefc9f7cd7b00dbe33eebfdee3fd5c0fbac9..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_processMessage.zsql +++ /dev/null @@ -1,20 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -uid</params> -UPDATE - <dtml-var table> -SET - processing_date = UTC_TIMESTAMP(6), - processing = 1 -WHERE - <dtml-sqltest uid type="int" multiple> -<dtml-var sql_delimiter> -COMMIT diff --git a/product/CMFActivity/skins/activity/SQLBase_reactivate.zsql b/product/CMFActivity/skins/activity/SQLBase_reactivate.zsql deleted file mode 100644 index d3ff103b5525f1dfe04ab3d010bc2dad02ce8e6e..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_reactivate.zsql +++ /dev/null @@ -1,25 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1000 -max_cache:100 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -uid:list -retry -delay -</params> -UPDATE - <dtml-var table> -SET - date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL - <dtml-sqlvar delay type="int"> SECOND) -<dtml-if expr="retry is not None"> - , priority = priority + <dtml-sqlvar retry type="int"> - , retry = retry + <dtml-sqlvar retry type="int"> -</dtml-if> -WHERE - <dtml-sqltest uid type="int" multiple> diff --git a/product/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql b/product/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql deleted file mode 100644 index 6b954815746bc2cefc7ccf930a869797e3d31b61..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_reserveMessageList.zsql +++ /dev/null @@ -1,21 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -processing_node -uid -</params> -UPDATE - <dtml-var table> -SET - processing_node=<dtml-sqlvar processing_node type="int"> -WHERE - <dtml-sqltest uid type="int" multiple> -<dtml-var sql_delimiter> -COMMIT diff --git a/product/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql b/product/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql deleted file mode 100644 index d687042f02abca59826ed4d3bd940cbc6c8a990b..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_selectReservedMessageList.zsql +++ /dev/null @@ -1,25 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -processing_node -group_method_id -count</params> -SELECT - * -FROM - <dtml-var table> -WHERE - processing_node = <dtml-sqlvar processing_node type="int"> -<dtml-if expr="group_method_id is not None"> - AND group_method_id = <dtml-sqlvar group_method_id type="string"> -</dtml-if> -<dtml-if expr="count is not None"> - LIMIT <dtml-sqlvar count type="int"> -</dtml-if> diff --git a/product/CMFActivity/skins/activity/SQLBase_timeShift.zsql b/product/CMFActivity/skins/activity/SQLBase_timeShift.zsql deleted file mode 100644 index d66b270f991ce26d8adf08ebbd825cbe3d18ed6b..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLBase_timeShift.zsql +++ /dev/null @@ -1,20 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params>table -delay -processing_node</params> -UPDATE - <dtml-var table> -SET - date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND), - processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND) -<dtml-if expr="processing_node is not None"> -WHERE <dtml-sqltest processing_node type="int"> -</dtml-if> diff --git a/product/CMFActivity/skins/activity/SQLDict_commit.zsql b/product/CMFActivity/skins/activity/SQLDict_commit.zsql deleted file mode 100644 index bdbf5b302f8e5ef7eac802ebcd6399dd88635ac9..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_commit.zsql +++ /dev/null @@ -1,11 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1000 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params></params> -COMMIT diff --git a/product/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql b/product/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql deleted file mode 100644 index e47f0e95701d4141cf918b82492119b92a064933..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_reserveDuplicatedLineList.zsql +++ /dev/null @@ -1,21 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params> -processing_node -uid -</params> -UPDATE - message -SET - processing_node=<dtml-sqlvar processing_node type="int"> -WHERE - <dtml-sqltest uid type="int" multiple> -<dtml-var sql_delimiter> -COMMIT diff --git a/product/CMFActivity/skins/activity/SQLDict_rollback.zsql b/product/CMFActivity/skins/activity/SQLDict_rollback.zsql deleted file mode 100644 index cb0b004af20cd1e9447d09f3f0deb0156898f6ae..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_rollback.zsql +++ /dev/null @@ -1,11 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:1000 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params></params> -ROLLBACK diff --git a/product/CMFActivity/skins/activity/SQLDict_selectChildMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_selectChildMessageList.zsql deleted file mode 100644 index 45fd52fabb5bfb87747fc9e23c34b41cbf9e279c..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_selectChildMessageList.zsql +++ /dev/null @@ -1,24 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params> -path -method_id -group_method_id -</params> -SELECT uid FROM - message -WHERE - processing_node = 0 - AND (path = <dtml-sqlvar path type="string"> - OR path LIKE <dtml-sqlvar type="string" - expr="path.replace('_', r'\_') + '/%'">) - AND method_id = <dtml-sqlvar method_id type="string"> - AND group_method_id = <dtml-sqlvar group_method_id type="string"> -FOR UPDATE diff --git a/product/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql b/product/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql deleted file mode 100644 index c65326688bc11a573e04aad26e661fc514d1b857..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_selectDuplicatedLineList.zsql +++ /dev/null @@ -1,22 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params> -path -method_id -group_method_id -</params> -SELECT uid FROM - message -WHERE - processing_node = 0 - AND path = <dtml-sqlvar path type="string"> - AND method_id = <dtml-sqlvar method_id type="string"> - AND group_method_id = <dtml-sqlvar group_method_id type="string"> -FOR UPDATE diff --git a/product/CMFActivity/skins/activity/SQLDict_selectParentMessage.zsql b/product/CMFActivity/skins/activity/SQLDict_selectParentMessage.zsql deleted file mode 100644 index a6a3b663bf376cdc73c5eab5e5f97a8c5704cefd..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLDict_selectParentMessage.zsql +++ /dev/null @@ -1,25 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params> -path -method_id -group_method_id -processing_node -</params> -SELECT * FROM - message -WHERE - processing_node IN (0, <dtml-sqlvar processing_node type="int">) - AND <dtml-sqltest path type="string" multiple> - AND method_id = <dtml-sqlvar method_id type="string"> - AND group_method_id = <dtml-sqlvar group_method_id type="string"> -ORDER BY path -LIMIT 1 -FOR UPDATE diff --git a/product/CMFActivity/skins/activity/SQLJoblib_selectDuplicatedLineList.zsql b/product/CMFActivity/skins/activity/SQLJoblib_selectDuplicatedLineList.zsql deleted file mode 100644 index cecc760cd69b707a39e1bbc337e23bfcc69c9fe4..0000000000000000000000000000000000000000 --- a/product/CMFActivity/skins/activity/SQLJoblib_selectDuplicatedLineList.zsql +++ /dev/null @@ -1,24 +0,0 @@ -<dtml-comment> -title: -connection_id:cmf_activity_sql_connection -max_rows:0 -max_cache:0 -cache_time:0 -class_name: -class_file: -</dtml-comment> -<params> -path -method_id -group_method_id -signature -</params> -SELECT uid FROM - message_job -WHERE - processing_node = 0 - AND path = <dtml-sqlvar path type="string"> - AND method_id = <dtml-sqlvar method_id type="string"> - AND group_method_id = <dtml-sqlvar group_method_id type="string"> - AND signature = <dtml-sqlvar signature type="string"> -FOR UPDATE diff --git a/product/CMFActivity/tests/testCMFActivity.py b/product/CMFActivity/tests/testCMFActivity.py index 51df237d2adfce9f8109022d5b65608e9a5cdde4..750aa860b60663fa06c60d75860d0b0e65d492c2 100644 --- a/product/CMFActivity/tests/testCMFActivity.py +++ b/product/CMFActivity/tests/testCMFActivity.py @@ -2184,7 +2184,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): activity.getProcessableMessageList(activity_tool, 3) self.commit() - result = activity._getMessageList(activity_tool) + result = activity._getMessageList(activity_tool.getSQLConnection()) try: self.assertEqual(len([message for message in result @@ -2205,8 +2205,8 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor): 1) finally: # Clear activities from all nodes - activity_tool.SQLBase_delMessage(table=SQLDict.sql_table, - uid=[message.uid for message in result]) + activity.deleteMessageList(activity_tool.getSQLConnection(), + [message.uid for message in result]) self.commit() def test_116_RaiseInCommitBeforeMessageExecution(self):