diff --git a/product/CMFActivity/Activity/Queue.py b/product/CMFActivity/Activity/Queue.py index 0228e6bdeedca12de092027c5bd74fe2d922b732..1b4a48515027185d402699d57e0a11c89e5a5fc0 100755 --- a/product/CMFActivity/Activity/Queue.py +++ b/product/CMFActivity/Activity/Queue.py @@ -40,9 +40,8 @@ INVALID_PATH = 1 INVALID_ORDER = 2 # Time global parameters -SECONDS_IN_DAY = 86400.0 -MAX_PROCESSING_TIME = 900 / SECONDS_IN_DAY # in fractions of day -VALIDATION_ERROR_DELAY = 30 / SECONDS_IN_DAY # in fractions of day +MAX_PROCESSING_TIME = 900 # in seconds +VALIDATION_ERROR_DELAY = 30 # in seconds class Queue: """ diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 74672b5fb199d20d45c9c9ff7b59c40446347334..7bb3aa7e615d1dfba8c0232347ed1e76b7ed09ca 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -29,11 +29,12 @@ import random from DateTime import DateTime from Products.CMFActivity.ActivityTool import registerActivity -from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY +from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY from RAMDict import RAMDict from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from ZODB.POSException import ConflictError import sys +import sha try: from transaction import get as get_transaction @@ -71,7 +72,8 @@ class SQLDict(RAMDict): message = self.dumpMessage(m), date = m.activity_kw.get('at_date', DateTime()), group_method_id = m.activity_kw.get('group_method_id', ''), - tag = m.activity_kw.get('tag', '')) + tag = m.activity_kw.get('tag', ''), + order_validation_text = self.getOrderValidationText(m)) # Also store uid of activity def prepareQueueMessageList(self, activity_tool, message_list): @@ -90,6 +92,7 @@ class SQLDict(RAMDict): date_list = [message.activity_kw.get('at_date', datetime) for message in registered_message_list] group_method_id_list = [message.activity_kw.get('group_method_id', '') for message in registered_message_list] tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list] + order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list] activity_tool.SQLDict_writeMessageList( path_list = path_list, method_id_list = method_id_list, priority_list = priority_list, @@ -97,7 +100,8 @@ class SQLDict(RAMDict): message_list = dumped_message_list, date_list = date_list, group_method_id_list = group_method_id_list, - tag_list = tag_list) + tag_list = tag_list, + order_validation_text_list = order_validation_text_list) def prepareDeleteMessage(self, activity_tool, m): # Erase all messages in a single transaction @@ -140,7 +144,20 @@ class SQLDict(RAMDict): else: return () - def validateMessage(self, activity_tool, message, uid_list, priority, next_processing_date, retry): + def getOrderValidationText(self, message): + # Return an identifier of validators related to ordering. + order_validation_item_list = [] + key_list = message.activity_kw.keys() + key_list.sort() + for key in key_list: + method_id = "_validate_%s" % key + if hasattr(self, method_id): + order_validation_item_list.append((key, message.activity_kw[key])) + if len(order_validation_item_list) == 0: + return '' + return sha.new(repr(order_validation_item_list)).hexdigest() + + def validateMessage(self, activity_tool, message, uid_list, priority, processing_node): validation_state = message.validate(self, activity_tool) if validation_state is not VALID: if validation_state in (EXCEPTION, INVALID_PATH): @@ -155,14 +172,17 @@ class SQLDict(RAMDict): else: # Lower priority if len(uid_list) > 0: # Add some delay before new processing - activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, - priority = priority + 1, retry = retry + 1) + activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, + priority = priority + 1, retry = 1) get_transaction().commit() # Release locks before starting a potentially long calculation else: # We do not lower priority for INVALID_ORDER errors but we do postpone execution - if len(uid_list) > 0: # Add some delay before new processing - activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, - priority = priority, retry = retry + 1) + order_validation_text = self.getOrderValidationText(message) + activity_tool.SQLDict_setPriority(order_validation_text = order_validation_text, + processing_node = processing_node, + delay = VALIDATION_ERROR_DELAY, + retry = 1, + uid = None) get_transaction().commit() # Release locks before starting a potentially long calculation return 0 return 1 @@ -182,23 +202,16 @@ class SQLDict(RAMDict): if len(result) == 0: # If the result is still empty, shift the dates so that SQLDict can dispatch pending active # objects quickly. - self.timeShift(activity_tool, VALIDATION_ERROR_DELAY) + self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node) elif len(result) > 0: #LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result))) line = result[0] path = line.path method_id = line.method_id - try: - retry = int(line.retry) - except TypeError: - retry = 1 - # Next processing date in case of error - next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date ) uid_list = [x.uid for x in uid_list] uid_list_list = [uid_list] priority_list = [line.priority] - retry_list = [retry] # Make sure message can not be processed anylonger if len(uid_list) > 0: # Set selected messages to processing @@ -208,7 +221,7 @@ class SQLDict(RAMDict): m = self.loadMessage(line.message, uid = line.uid) message_list = [m] # Validate message (make sure object exists, priority OK, etc.) - if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry): + if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): group_method_id = m.activity_kw.get('group_method_id') if group_method_id is not None: # Count the number of objects to prevent too many objects. @@ -234,12 +247,6 @@ class SQLDict(RAMDict): for line in result: path = line.path method_id = line.method_id - try: - retry = int(line.retry) - except TypeError: - retry = 1 - # Next processing date in case of error - next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date ) uid_list = [x.uid for x in uid_list] if len(uid_list) > 0: @@ -247,7 +254,7 @@ class SQLDict(RAMDict): activity_tool.SQLDict_processMessage(uid = uid_list) get_transaction().commit() # Release locks before starting a potentially long calculation m = self.loadMessage(line.message, uid = line.uid) - if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date, retry): + if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): if m.hasExpandMethod(): try: count += len(m.getObjectList(activity_tool)) @@ -262,7 +269,6 @@ class SQLDict(RAMDict): message_list.append(m) uid_list_list.append(uid_list) priority_list.append(line.priority) - retry_list.append(retry) if count >= MAX_GROUPED_OBJECTS: break @@ -293,9 +299,6 @@ class SQLDict(RAMDict): m = message_list[i] uid_list = uid_list_list[i] priority = priority_list[i] - retry = retry_list[i] - # Next processing date in case of error - next_processing_date = now_date + VALIDATION_ERROR_DELAY * retry if m.is_executed: activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it get_transaction().commit() # If successful, commit @@ -315,8 +318,8 @@ class SQLDict(RAMDict): else: # Lower priority if len(uid_list) > 0: - activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date, - priority = priority + 1, retry = retry + 1) + activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, + priority = priority + 1, retry = 1) get_transaction().commit() # Release locks before starting a potentially long calculation return 0 @@ -519,11 +522,11 @@ class SQLDict(RAMDict): return VALID # Required for tests (time shift) - def timeShift(self, activity_tool, delay): + def timeShift(self, activity_tool, delay, processing_node=None): """ To simulate timeShift, we simply substract delay from all dates in SQLDict message table """ - activity_tool.SQLDict_timeShift(delay = delay * SECONDS_IN_DAY) + activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node) registerActivity(SQLDict) diff --git a/product/CMFActivity/Activity/SQLQueue.py b/product/CMFActivity/Activity/SQLQueue.py index 54e6c643bf7fbdfbf5eb7a0dc7b801ee9258b9f4..4b0465ff9c68bb09b57172a8d856762e9d2cbee6 100755 --- a/product/CMFActivity/Activity/SQLQueue.py +++ b/product/CMFActivity/Activity/SQLQueue.py @@ -30,7 +30,7 @@ import random from Products.CMFActivity.ActivityTool import registerActivity from RAMQueue import RAMQueue from DateTime import DateTime -from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY +from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE try: @@ -314,11 +314,11 @@ class SQLQueue(RAMQueue): return VALID # Required for tests (time shift) - def timeShift(self, activity_tool, delay): + def timeShift(self, activity_tool, delay, processing_node = None): """ To simulate timeShift, we simply substract delay from all dates in SQLDict message table """ - activity_tool.SQLQueue_timeShift(delay = delay * SECONDS_IN_DAY) + activity_tool.SQLQueue_timeShift(delay = delay, processing_node = processing_node) registerActivity(SQLQueue) diff --git a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql index d07f1d609d1cb651763f89ae5d864b121b4c39d3..7624682373d60a834abfdcceef0db37c4eb8ba65 100755 --- a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql @@ -21,6 +21,7 @@ CREATE TABLE `message` ( `group_method_id` VARCHAR(255) DEFAULT '', `tag` VARCHAR(255), `retry` TINYINT UNSIGNED NOT NULL DEFAULT 0, + `order_validation_text` VARCHAR(255), `message` BLOB, PRIMARY KEY (`uid`), KEY `date` (`date`), @@ -30,5 +31,6 @@ CREATE TABLE `message` ( KEY `processing` (`processing`), KEY `processing_date` (`processing_date`), KEY `priority` (`priority`), - KEY `tag` (`tag`) + KEY `tag` (`tag`), + KEY `order_validation_text` (`order_validation_text`) ) TYPE = InnoDB; diff --git a/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql index e5bb6c54f4032ef8cb84bb7617e943d581b75a70..6d8de64aacc2b21540b316d1f5c70c9368007722 100755 --- a/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql @@ -26,10 +26,10 @@ AND message WHERE processing <> 1 -<dtml-if processing_node>AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> -<dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if> -<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> -<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> +<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> +<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if> +<dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if> +<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> GROUP BY path, method_id, processing_node, processing ORDER BY diff --git a/product/CMFActivity/skins/activity/SQLDict_readUidList.zsql b/product/CMFActivity/skins/activity/SQLDict_readUidList.zsql index 968f89cc4bfbd3413058b785139399e793eb99b7..8614d9b93432fcec3794de5213c56f88d5a1d2b4 100755 --- a/product/CMFActivity/skins/activity/SQLDict_readUidList.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_readUidList.zsql @@ -16,6 +16,6 @@ SELECT uid FROM WHERE processing <> 1 <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> -<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> -<dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if> -<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> +<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> +<dtml-if path> AND path = <dtml-sqlvar path type="string"> </dtml-if> +<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> diff --git a/product/CMFActivity/skins/activity/SQLDict_setPriority.zsql b/product/CMFActivity/skins/activity/SQLDict_setPriority.zsql index 69854a24d5f8ed6aa81ec2e061833af9cbe54ee4..705a345c3280dab5693955e43bffd9176eb940db 100755 --- a/product/CMFActivity/skins/activity/SQLDict_setPriority.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_setPriority.zsql @@ -10,16 +10,37 @@ class_file: <params>uid:list priority date -retry</params> +retry +delay +processing_node +order_validation_text</params> UPDATE message SET - priority = <dtml-sqlvar priority type="int">, - processing = 0, - date = <dtml-sqlvar date type="datetime"> + processing = 0 +<dtml-if priority> + , priority = <dtml-sqlvar priority type="int"> +</dtml-if> +<dtml-if delay> + , date = DATE_ADD(NOW(), INTERVAL <dtml-sqlvar delay type="int"> * (retry + 1) SECOND) +<dtml-elif date> + , date = <dtml-sqlvar date type="datetime"> +</dtml-if> <dtml-if retry> - , retry = <dtml-sqlvar retry type="int"> + , retry = retry + <dtml-sqlvar retry type="int"> </dtml-if> WHERE -<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> - OR </dtml-if></dtml-in> + 1 = 1 +<dtml-if uid> + AND ( + <dtml-in uid> + uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> OR </dtml-if> + </dtml-in> + ) +</dtml-if> +<dtml-if processing_node> + AND processing_node = <dtml-sqlvar processing_node type="int"> +</dtml-if> +<dtml-if order_validation_text> + AND order_validation_text = <dtml-sqlvar order_validation_text type="string"> +</dtml-if> \ No newline at end of file diff --git a/product/CMFActivity/skins/activity/SQLDict_timeShift.zsql b/product/CMFActivity/skins/activity/SQLDict_timeShift.zsql index ac7cd9da0a7fc5f34fe15252af0cdafceff0873e..7dc29826b4d12c7758df7a9f900d05c235713f32 100755 --- a/product/CMFActivity/skins/activity/SQLDict_timeShift.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_timeShift.zsql @@ -7,9 +7,15 @@ cache_time:0 class_name: class_file: </dtml-comment> -<params>delay</params> +<params>delay +processing_node</params> UPDATE message SET - date = date - <dtml-sqlvar delay type="int">, - processing_date = processing_date - <dtml-sqlvar delay type="int"> + date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND), + processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND) +WHERE + 1 = 1 +<dtml-if processing_node> + AND processing_node = <dtml-sqlvar processing_node type="int"> +</dtml-if> \ No newline at end of file diff --git a/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql b/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql index 1f72be1907a2e5608a0df417eb7311e636a87cc5..6ba11dbbf114eb5d11ab366a2790421fc4770df8 100755 --- a/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql @@ -15,7 +15,8 @@ broadcast date processing_node=-1 group_method_id -tag</params> +tag +order_validation_text</params> INSERT INTO message SET path = <dtml-sqlvar path type="string">, @@ -27,4 +28,5 @@ SET broadcast = <dtml-sqlvar broadcast type="int">, group_method_id = <dtml-sqlvar group_method_id type="string">, tag = <dtml-sqlvar tag type="string">, + order_validation_text = <dtml-sqlvar order_validation_text type="string">, message = <dtml-sqlvar message type="string"> diff --git a/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql index 17a9b38cc34df71bf30961b88aa80a871ae4f6f2..929a79b8e35fd8d0a21376247481b320cbc043cd 100755 --- a/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_writeMessageList.zsql @@ -15,9 +15,10 @@ broadcast_list date_list processing_node_list group_method_id_list -tag_list</params> +tag_list +order_validation_text_list</params> INSERT INTO message -(path, date, method_id, processing_node, processing, priority, broadcast, group_method_id, tag, message) +(path, date, method_id, processing_node, processing, priority, broadcast, group_method_id, tag, order_validation_text, message) VALUES <dtml-in prefix="loop" expr="_.range(_.len(path_list))"> <dtml-if sequence-start><dtml-else>,</dtml-if> @@ -31,6 +32,7 @@ VALUES <dtml-sqlvar expr="broadcast_list[loop_item]" type="int">, <dtml-sqlvar expr="group_method_id_list[loop_item]" type="string">, <dtml-sqlvar expr="tag_list[loop_item]" type="string">, + <dtml-sqlvar expr="order_validation_text_list[loop_item]" type="string">, <dtml-sqlvar expr="message_list[loop_item]" type="string"> ) </dtml-in> diff --git a/product/CMFActivity/skins/activity/SQLQueue_timeShift.zsql b/product/CMFActivity/skins/activity/SQLQueue_timeShift.zsql index b2aa15243cbdf07abb70d7b63e1c42c80925878f..2762277685b58fc9e6bcbd3a5a47d2700f8f8f13 100755 --- a/product/CMFActivity/skins/activity/SQLQueue_timeShift.zsql +++ b/product/CMFActivity/skins/activity/SQLQueue_timeShift.zsql @@ -7,9 +7,15 @@ cache_time:0 class_name: class_file: </dtml-comment> -<params>delay</params> +<params>delay +processing_node</params> UPDATE message_queue SET date = date - <dtml-sqlvar delay type="int">, processing_date = processing_date - <dtml-sqlvar delay type="int"> +WHERE + 1 = 1 +<dtml-if processing_node> + AND processing_node = <dtml-sqlvar processing_node type="int"> +</dtml-if> \ No newline at end of file