Commit 467f5b34 authored by Sebastien Robin's avatar Sebastien Robin

- do not stock new ids used by activities into zodb, this improve

  a lot performance under very high load
- add a round_robin_scheduling parameter for SQLQueue so that
  all messages on a same path will not be processed on the same
  node


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@17268 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 14bf7238
...@@ -95,7 +95,8 @@ class SQLDict(RAMDict): ...@@ -95,7 +95,8 @@ class SQLDict(RAMDict):
for message in registered_message_list] for message in registered_message_list]
tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list] tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list] order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=len(registered_message_list)) uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity',
id_count=len(registered_message_list), store=0)
activity_tool.SQLDict_writeMessageList( uid_list = uid_list, activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
path_list = path_list, path_list = path_list,
method_id_list = method_id_list, method_id_list = method_id_list,
...@@ -484,12 +485,12 @@ class SQLDict(RAMDict): ...@@ -484,12 +485,12 @@ class SQLDict(RAMDict):
order_validation_text = line.order_validation_text) order_validation_text = line.order_validation_text)
self.getExecutableMessageList(activity_tool, message, message_dict, self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict) validation_text_dict)
# XXX probably this below can be optimized by assigning multiple messages at a time. # XXX probably this below can be optimized by assigning multiple messages at a time.
path_dict = {} path_dict = {}
assignMessage = activity_tool.SQLDict_assignMessage assignMessage = activity_tool.SQLDict_assignMessage
processing_node = LAST_PROCESSING_NODE processing_node = LAST_PROCESSING_NODE
id_tool = activity_tool.getPortalObject().portal_ids id_tool = activity_tool.getPortalObject().portal_ids
for message in message_dict.itervalues(): for message in message_dict.itervalues():
path = '/'.join(message.object_path) path = '/'.join(message.object_path)
broadcast = message.activity_kw.get('broadcast', 0) broadcast = message.activity_kw.get('broadcast', 0)
...@@ -499,7 +500,8 @@ class SQLDict(RAMDict): ...@@ -499,7 +500,8 @@ class SQLDict(RAMDict):
assignMessage(processing_node=1, uid=[uid]) assignMessage(processing_node=1, uid=[uid])
if node_count > 1: if node_count > 1:
uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity', uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
id_count=node_count - 1) id_count=node_count - 1,
store=0)
path_list = [path] * (node_count - 1) path_list = [path] * (node_count - 1)
method_id_list = [message.method_id] * (node_count - 1) method_id_list = [message.method_id] * (node_count - 1)
priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1) priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
......
...@@ -64,7 +64,7 @@ class SQLQueue(RAMQueue): ...@@ -64,7 +64,7 @@ class SQLQueue(RAMQueue):
def prepareQueueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
if m.is_registered: if m.is_registered:
id_tool = activity_tool.getPortalObject().portal_ids id_tool = activity_tool.getPortalObject().portal_ids
uid = id_tool.generateNewLengthId(id_group='portal_activity_queue') uid = id_tool.generateNewLengthId(id_group='portal_activity_queue', store=0)
activity_tool.SQLQueue_writeMessage(uid = uid, activity_tool.SQLQueue_writeMessage(uid = uid,
path = '/'.join(m.object_path) , path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
...@@ -89,6 +89,10 @@ class SQLQueue(RAMQueue): ...@@ -89,6 +89,10 @@ class SQLQueue(RAMQueue):
next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400 next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
message_list = readMessage(processing_node=processing_node, to_date=now_date) message_list = readMessage(processing_node=processing_node, to_date=now_date)
for line in message_list: for line in message_list:
# Do not process many messages if there are long
new_date = DateTime()
if ((new_date-now_date)*86400) > 10:
break
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
# Make sure message can not be processed anylonger # Make sure message can not be processed anylonger
...@@ -331,7 +335,8 @@ class SQLQueue(RAMQueue): ...@@ -331,7 +335,8 @@ class SQLQueue(RAMQueue):
assignMessage(processing_node=1, uid=message.uid) assignMessage(processing_node=1, uid=message.uid)
if node_count > 1: if node_count > 1:
uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity_queue', uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity_queue',
id_count=node_count - 1) id_count=node_count - 1,
store=0)
priority = message.activity_kw.get('priority', 1) priority = message.activity_kw.get('priority', 1)
dumped_message = self.dumpMessage(message) dumped_message = self.dumpMessage(message)
date = message.activity_kw.get('at_date', now_date) date = message.activity_kw.get('at_date', now_date)
...@@ -352,9 +357,11 @@ class SQLQueue(RAMQueue): ...@@ -352,9 +357,11 @@ class SQLQueue(RAMQueue):
# the same node, so that object caching is more efficient. Otherwise, apply a round # the same node, so that object caching is more efficient. Otherwise, apply a round
# robin scheduling. # robin scheduling.
node = path_dict.get(path) node = path_dict.get(path)
round_robin_scheduling = message.activity_kw.get('round_robin_scheduling', 0)
if node is None: if node is None:
node = processing_node node = processing_node
path_dict[path] = node if not round_robin_scheduling:
path_dict[path] = node
processing_node += 1 processing_node += 1
if processing_node > node_count: if processing_node > node_count:
processing_node = 1 processing_node = 1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment