Commit 6fd63363 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: new 'group_method_cost' activity parameter

It defaults to .01 and must be in the interval ]0, 1]
If a message expands to N objects, then it costs N / group_method_cost
and activity tool stop reserving messages the sum of all costs exceeds 1.

By default, this preserves original behaviour, i.e. MAX_GROUPED_OBJECTS = 100

Value 1 is not rejected to allow generic coding
(ex: activity_kw['group_method_cost'] = 1. / packet_size)
parent d5e23707
......@@ -38,10 +38,6 @@ from Products.CMFActivity.ActivityRuntimeEnvironment import (
ActivityRuntimeEnvironment, getTransactionalVariable)
from Queue import VALIDATION_ERROR_DELAY
# Stop electing more messages for processing if more than this number of
# objects are impacted by elected messages.
MAX_GROUPED_OBJECTS = 100
def sort_message_key(message):
# same sort key as in SQL{Dict,Queue}_readMessageList
return message.line.priority, message.line.date, message.uid
......@@ -176,11 +172,11 @@ class SQLBase:
- reserve a message
- set reserved message to processing=1 state
- if this message has a group_method_id:
- reserve a bunch of BUNDLE_MESSAGE_COUNT messages
- untill number of impacted objects goes over MAX_GROUPED_OBJECTS
- reserve a bunch of messages
- until the total "cost" of the group goes over 1
- get one message from the reserved bunch (this messages will be
"needed")
- increase the number of impacted object
- update the total cost
- set "needed" reserved messages to processing=1 state
- unreserve "unneeded" messages
- return still-reserved message list and a group_method_id
......@@ -193,7 +189,6 @@ class SQLBase:
Returned values:
4-tuple:
- list of messages
- impacted object count
- group_method_id
- uid_to_duplicate_uid_list_dict
"""
......@@ -212,10 +207,8 @@ class SQLBase:
if len(uid_list):
self._log(TRACE, 'Reserved duplicate messages: %r' % (uid_list, ))
return uid_list
BUNDLE_MESSAGE_COUNT = 100 # Arbitrary number
now_date = self.getNow(activity_tool)
message_list = []
count = 0
group_method_id = None
try:
result = getReservedMessageList(limit=1)
......@@ -231,10 +224,16 @@ class SQLBase:
.extend(getDuplicateMessageUidList(line))
if group_method_id not in (None, '', '\0'):
# Count the number of objects to prevent too many objects.
count += len(m.getObjectList(activity_tool))
if count < MAX_GROUPED_OBJECTS:
cost = m.activity_kw.get('group_method_cost', .01)
assert 0 < cost <= 1, (self.sql_table, uid)
count = len(m.getObjectList(activity_tool))
# this is heuristic (messages with same group_method_id
# are likely to have the same group_method_cost)
limit = int(1. / cost + 1 - count)
if limit > 1: # <=> cost * count < 1
cost *= count
# Retrieve objects which have the same group method.
result = getReservedMessageList(limit=BUNDLE_MESSAGE_COUNT,
result = getReservedMessageList(limit=limit,
group_method_id=group_method_id)
path_and_method_id_dict = {}
unreserve_uid_list = []
......@@ -260,9 +259,10 @@ class SQLBase:
path_and_method_id_dict[key] = line.uid
uid_to_duplicate_uid_list_dict.setdefault(line.uid, []) \
.extend(getDuplicateMessageUidList(line))
if count < MAX_GROUPED_OBJECTS:
if cost < 1:
m = self.loadMessage(line.message, uid=line.uid, line=line)
count += len(m.getObjectList(activity_tool))
cost += len(m.getObjectList(activity_tool)) * \
m.activity_kw.get('group_method_cost', .01)
message_list.append(m)
else:
unreserve_uid_list.append(line.uid)
......@@ -271,8 +271,7 @@ class SQLBase:
# Unreserve extra messages as soon as possible.
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=unreserve_uid_list)
return (message_list, count, group_method_id,
uid_to_duplicate_uid_list_dict)
return message_list, group_method_id, uid_to_duplicate_uid_list_dict
except:
self._log(WARNING, 'Exception while reserving messages.')
if len(message_list):
......@@ -298,7 +297,7 @@ class SQLBase:
final_uid_list.extend(uid_to_duplicate_uid_list_dict.get(uid, []))
self.makeMessageListAvailable(activity_tool=activity_tool,
uid_list=final_uid_list)
message_list, count, group_method_id, uid_to_duplicate_uid_list_dict = \
message_list, group_method_id, uid_to_duplicate_uid_list_dict = \
self.getProcessableMessageList(activity_tool, processing_node)
if message_list:
# Remove group_id parameter from group_method_id
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment