Commit c5edcba2 authored by Jean-Paul Smets's avatar Jean-Paul Smets

Moved sticky processing to readMessageList with delay var to reduce calls and potential conflicts


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@1302 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 94a4887a
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
import pickle, sys import pickle, sys
from Acquisition import aq_base from Acquisition import aq_base
from DateTime import DateTime
from Products.CMFActivity.ActivityTool import Message from Products.CMFActivity.ActivityTool import Message
from zLOG import LOG from zLOG import LOG
...@@ -81,6 +82,7 @@ class Queue: ...@@ -81,6 +82,7 @@ class Queue:
self.is_alive = {} self.is_alive = {}
self.is_awake = {} self.is_awake = {}
self.is_initialized = 0 self.is_initialized = 0
self.max_processing_date = DateTime()
def initialize(self, activity_tool): def initialize(self, activity_tool):
# This is the only moment when # This is the only moment when
......
...@@ -43,7 +43,7 @@ priority_weight = \ ...@@ -43,7 +43,7 @@ priority_weight = \
[3] * 10 + \ [3] * 10 + \
[4] * 5 + \ [4] * 5 + \
[5] * 1 [5] * 1
class ActivityFlushError(Exception): class ActivityFlushError(Exception):
"""Error during active message flush""" """Error during active message flush"""
...@@ -108,14 +108,12 @@ class SQLDict(RAMDict): ...@@ -108,14 +108,12 @@ class SQLDict(RAMDict):
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if hasattr(activity_tool,'SQLDict_readMessage'): if hasattr(activity_tool,'SQLDict_readMessage'):
now_date = DateTime() now_date = DateTime()
# Sticky processing messages should be set back to non processing
max_processing_date = now_date - MAX_PROCESSING_TIME
# Next processing date in case of error # Next processing date in case of error
next_processing_date = now_date + VALIDATION_ERROR_DELAY next_processing_date = now_date + VALIDATION_ERROR_DELAY
priority = random.choice(priority_weight) priority = random.choice(priority_weight)
# Try to find a message at given priority level which is scheduled for now # Try to find a message at given priority level which is scheduled for now
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
to_date=now_date, to_processing_date = max_processing_date) to_date=now_date)
if len(result) == 0: if len(result) == 0:
# If empty, take any message which is scheduled for now # If empty, take any message which is scheduled for now
priority = None priority = None
...@@ -261,18 +259,26 @@ class SQLDict(RAMDict): ...@@ -261,18 +259,26 @@ class SQLDict(RAMDict):
# YO: reading all lines might cause a deadlock # YO: reading all lines might cause a deadlock
message_list = [] message_list = []
if hasattr(activity_tool,'SQLDict_readMessageList'): if hasattr(activity_tool,'SQLDict_readMessageList'):
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None) result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None)
for line in result: for line in result:
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
m.processing_node = line.processing_node m.processing_node = line.processing_node
m.priority = line.priority m.priority = line.priority
message_list.append(m) message_list.append(m)
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1 processing_node = 1
if hasattr(activity_tool,'SQLDict_readMessageList'): if hasattr(activity_tool,'SQLDict_readMessageList'):
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages now_date = DateTime()
if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
# Sticky processing messages should be set back to non processing
max_processing_date = now_date - MAX_PROCESSING_TIME
self.max_processing_date = now_date
else:
max_processing_date = None
result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
to_processing_date = max_processing_date) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {} path_dict = {}
for line in result: for line in result:
......
...@@ -11,17 +11,7 @@ class_file: ...@@ -11,17 +11,7 @@ class_file:
priority priority
to_date to_date
to_processing_date</params> to_processing_date</params>
<dtml-if to_processing_date>UPDATE message SELECT * FROM
SET
processing = 0
WHERE
processing = 1
AND
processing_date < <dtml-sqlvar to_processing_date type="string">
<dtml-var "'\0'">
</dtml-if>SELECT * FROM
message message
WHERE WHERE
processing <> 1 processing <> 1
......
...@@ -10,8 +10,19 @@ class_file: ...@@ -10,8 +10,19 @@ class_file:
<params>path <params>path
method_id method_id
processing_node processing_node
priority</params> priority
SELECT * FROM to_processing_date</params>
<dtml-if to_processing_date>UPDATE message
SET
processing = 0
WHERE
processing = 1
AND
processing_date < <dtml-sqlvar to_processing_date type="string">
<dtml-var "'\0'">
</dtml-if>SELECT * FROM
message message
WHERE WHERE
processing <> 1 processing <> 1
......
...@@ -207,10 +207,19 @@ class TestCMFActivity(ERP5TypeTestCase): ...@@ -207,10 +207,19 @@ class TestCMFActivity(ERP5TypeTestCase):
organisation.setTitle(self.title1) organisation.setTitle(self.title1)
organisation.activate(activity=activity).setTitle(self.title2) organisation.activate(activity=activity).setTitle(self.title2)
organisation.flushActivity(invoke=1) organisation.flushActivity(invoke=1)
self.assertEquals(organisation.getTitle(),self.title2)
get_transaction().commit() get_transaction().commit()
message_list = portal.portal_activities.getMessageList() message_list = portal.portal_activities.getMessageList()
self.assertEquals(len(message_list),0) self.assertEquals(len(message_list),0)
self.assertEquals(organisation.getTitle(),self.title2) self.assertEquals(organisation.getTitle(),self.title2)
# Try again with different commit order
organisation.setTitle(self.title1)
organisation.activate(activity=activity).setTitle(self.title2)
get_transaction().commit()
organisation.flushActivity(invoke=1)
self.assertEquals(len(message_list),0)
self.assertEquals(organisation.getTitle(),self.title2)
get_transaction().commit()
def TryActivateInsideFlush(self, activity): def TryActivateInsideFlush(self, activity):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment