diff --git a/product/CMFActivity/Activity/SQLDict.py b/product/CMFActivity/Activity/SQLDict.py index 5c44ea43fe5905b2d81939e5d1b4a58fc0b30330..2708e1b22b45cbcc2d894745951cdf8cb21f17d0 100755 --- a/product/CMFActivity/Activity/SQLDict.py +++ b/product/CMFActivity/Activity/SQLDict.py @@ -26,17 +26,25 @@ # ############################################################################## +import random from Products.CMFActivity.ActivityTool import registerActivity from RAMDict import RAMDict from zLOG import LOG -MAX_RETRY = 10 +MAX_RETRY = 5 DISTRIBUTABLE_STATE = -1 INVOKE_ERROR_STATE = -2 VALIDATE_ERROR_STATE = -3 +priority_weight = \ + [1] * 64 + \ + [2] * 20 + \ + [3] * 10 + \ + [4] * 5 + \ + [5] * 1 + class SQLDict(RAMDict): """ A simple OOBTree based queue. It should be compatible with transactions @@ -45,10 +53,18 @@ class SQLDict(RAMDict): """ def queueMessage(self, activity_tool, m): - activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , method_id = m.method_id, message = self.dumpMessage(m)) + activity_tool.SQLDict_writeMessage(path = '/'.join(m.object_path) , + method_id = m.method_id, + priority = m.activity_kw.get('priority', 1), + message = self.dumpMessage(m)) def dequeueMessage(self, activity_tool, processing_node): - result = activity_tool.SQLDict_readMessage(processing_node=processing_node) + priority = random.choice(priority_weight) + # Try to find a message at given priority level + result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) + if len(result) == 0: + # If empty, take any message + result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=None) if len(result) > 0: line = result[0] path = line.path @@ -57,7 +73,7 @@ class SQLDict(RAMDict): activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node) get_transaction().commit() # Release locks before starting a potentially long calculation m = self.loadMessage(line.message) - if m.validate(self, activity_tool): + if m.validate(self, activity_tool): # We should validate each time XXX in case someone is deleting it at the same time retry = 0 while retry < MAX_RETRY: activity_tool.invoke(m) # Try to invoke the message @@ -122,7 +138,7 @@ class SQLDict(RAMDict): get_transaction().abort() # Abort and retry retry = retry + 1 if m.is_executed: # Make sure message could be invoked - activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node) # Delete it + activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=None) # Delete it if commit: get_transaction().commit() # If successful, commit else: if commit: get_transaction().abort() # If not, abort transaction and start a new one diff --git a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql index 6bb9e0623e2daa777ce60828c80cc062c30b42e1..608ac8f6808b7777e951e2a98c1f90836d10a1f6 100755 --- a/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_createMessageTable.zsql @@ -13,9 +13,11 @@ CREATE TABLE `message` ( `method_id` VARCHAR(40), `processing_node` INT DEFAULT -1, `processing` INT DEFAULT 0, + `priority` INT DEFAULT 0, `message` BLOB, KEY `path` (`path`), KEY `method_id` (`method_id`), KEY `processing_node` (`processing_node`), KEY `processing` (`processing`), + KEY `priority` (`priority`), ) TYPE = InnoDB; diff --git a/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql b/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql index 558017fd2763a8cbee729f01960abd11985667c9..54374d4165ed3920b2c7153215a489ee56511944 100755 --- a/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_readMessage.zsql @@ -7,11 +7,12 @@ cache_time:0 class_name: class_file: </dtml-comment> -<params>processing_node</params> +<params>processing_node +priority</params> SELECT * FROM message WHERE processing = 0 -<dtml-if processing_node> -AND processing_node = <dtml-sqlvar processing_node type="int"> -</dtml-if> \ No newline at end of file +<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if> +<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if> + diff --git a/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql b/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql index f39ac54dbfea1c503cf0f989fb062203a5f3b99e..6d0c8ee869d1118373f80ddf80033a3c25e8d243 100755 --- a/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_readMessageList.zsql @@ -9,11 +9,13 @@ class_file: </dtml-comment> <params>path method_id -processing_node</params> +processing_node +priority</params> SELECT * FROM message WHERE processing = 0 <dtml-if processing_node>AND processing_node = <dtml-sqlvar processing_node type="int"> </dtml-if> +<dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> diff --git a/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql b/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql index adb89c1eabee6efcda573faf5d230ab96c4572d7..be75351924ee86098578f7e661d55fd0dbda8993 100755 --- a/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql +++ b/product/CMFActivity/skins/activity/SQLDict_writeMessage.zsql @@ -9,7 +9,8 @@ class_file: </dtml-comment> <params>path method_id +priority message</params> INSERT INTO message VALUES - (<dtml-sqlvar path type="string">,<dtml-sqlvar method_id type="string">,-1,0,<dtml-sqlvar message type="string">); + (<dtml-sqlvar path type="string">,<dtml-sqlvar method_id type="string">,-1,0,<dtml-sqlvar priority type="int">,<dtml-sqlvar message type="string">);