Commit e48e00fc authored by Yoshinori Okuji's avatar Yoshinori Okuji

Limit the max number of grouped objects.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@3856 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 0ba37050
...@@ -32,10 +32,12 @@ from Products.CMFActivity.ActivityTool import registerActivity ...@@ -32,10 +32,12 @@ from Products.CMFActivity.ActivityTool import registerActivity
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from ZODB.POSException import ConflictError
from zLOG import LOG from zLOG import LOG
MAX_PRIORITY = 5 MAX_PRIORITY = 5
MAX_GROUPED_OBJECTS = 300
priority_weight = \ priority_weight = \
[1] * 64 + \ [1] * 64 + \
...@@ -191,25 +193,51 @@ class SQLDict(RAMDict): ...@@ -191,25 +193,51 @@ class SQLDict(RAMDict):
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date): if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
group_method_id = m.activity_kw.get('group_method_id') group_method_id = m.activity_kw.get('group_method_id')
if group_method_id is not None: if group_method_id is not None:
# Count the number of objects to prevent too many objects.
if m.hasExpandMethod():
try:
count = len(m.getObjectList())
except ConflictError:
raise
except:
# Here, simply ignore an exception. The same exception should be handled later.
count = 0
else:
count = 1
group_method = activity_tool.restrictedTraverse(group_method_id) group_method = activity_tool.restrictedTraverse(group_method_id)
# Retrieve objects which have the same group method.
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, if count < MAX_GROUPED_OBJECTS:
to_date=now_date, group_method_id=group_method_id) # Retrieve objects which have the same group method.
#LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result))) result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
for line in result: to_date=now_date, group_method_id=group_method_id)
path = line.path #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
method_id = line.method_id for line in result:
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date ) path = line.path
uid_list = [x.uid for x in uid_list] method_id = line.method_id
if len(uid_list) > 0: uid_list = activity_tool.SQLDict_readUidList( path=path, method_id=method_id, processing_node=None, to_date=now_date )
# Set selected messages to processing uid_list = [x.uid for x in uid_list]
activity_tool.SQLDict_processMessage(uid = uid_list) if len(uid_list) > 0:
get_transaction().commit() # Release locks before starting a potentially long calculation # Set selected messages to processing
m = self.loadMessage(line.message, uid = line.uid) activity_tool.SQLDict_processMessage(uid = uid_list)
if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date): get_transaction().commit() # Release locks before starting a potentially long calculation
message_list.append(m) m = self.loadMessage(line.message, uid = line.uid)
uid_list_list.append(uid_list) if self.validateMessage(activity_tool, m, uid_list, line.priority, next_processing_date):
priority_list.append(line.priority) if m.hasExpandMethod():
try:
count += len(m.getObjectList())
except ConflictError:
raise
except:
# Here, simply ignore an exception. The same exception should be handled later.
pass
else:
count += 1
message_list.append(m)
uid_list_list.append(uid_list)
priority_list.append(line.priority)
if count >= MAX_GROUPED_OBJECTS:
break
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# Try to invoke # Try to invoke
......
...@@ -85,20 +85,23 @@ class Message: ...@@ -85,20 +85,23 @@ class Message:
self.kw = kw self.kw = kw
self.is_executed = 0 self.is_executed = 0
self.user_name = str(_getAuthenticatedUser(self)) self.user_name = str(_getAuthenticatedUser(self))
self.object_list = None
# Store REQUEST Info ? # Store REQUEST Info ?
def getObject(self, activity_tool): def getObject(self, activity_tool):
return activity_tool.unrestrictedTraverse(self.object_path) return activity_tool.unrestrictedTraverse(self.object_path)
def getObjectList(self, activity_tool): def getObjectList(self, activity_tool):
try: if self.object_list is not None:
expand_method_id = self.activity_kw['expand_method_id'] try:
except KeyError: expand_method_id = self.activity_kw['expand_method_id']
return [self.getObject()] obj = self.getObject(activity_tool)
# FIXME: how to pass parameters?
self.object_list = getattr(obj, expand_method_id)()
except KeyError:
self.object_list = [self.getObject(activity_tool)]
obj = self.getObject(activity_tool) return self.object_list
# FIXME: how to pass parameters?
return getattr(obj, expand_method_id)()
def hasExpandMethod(self): def hasExpandMethod(self):
return self.activity_kw.has_key('expand_method_id') return self.activity_kw.has_key('expand_method_id')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment