Commit 2a48f66f authored by Yoshinori Okuji's avatar Yoshinori Okuji

Do not refer to the activity tool in _finish.

Instead, store the path in ActivityBuffer in __init__.
Make the parameter activity_tool to __init__ in ActivityBuffer obligatory.
Some performance tuning.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@8153 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent a6e65ddc
...@@ -194,7 +194,7 @@ class Queue: ...@@ -194,7 +194,7 @@ class Queue:
# Called to prepare transaction commit for queued messages # Called to prepare transaction commit for queued messages
pass pass
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool_path, m):
# Called to commit queued messages # Called to commit queued messages
pass pass
...@@ -202,7 +202,7 @@ class Queue: ...@@ -202,7 +202,7 @@ class Queue:
# Called to prepare transaction commit for deleted messages # Called to prepare transaction commit for deleted messages
pass pass
def finishDeleteMessage(self, activity_tool, m): def finishDeleteMessage(self, activity_tool_path, m):
# Called to commit deleted messages # Called to commit deleted messages
pass pass
......
...@@ -50,20 +50,17 @@ class RAMDict(Queue): ...@@ -50,20 +50,17 @@ class RAMDict(Queue):
Queue.__init__(self) Queue.__init__(self)
self.queue_dict = {} self.queue_dict = {}
def getDict(self, activity_tool): def getDict(self, activity_tool_path):
path = activity_tool.getPhysicalPath() return self.queue_dict.setdefault(activity_tool_path, {})
if not self.queue_dict.has_key(path):
self.queue_dict[path] = {}
return self.queue_dict[path]
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool_path, m):
if m.is_registered: if m.is_registered:
self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] = m self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)] = m
def finishDeleteMessage(self, activity_tool, message): def finishDeleteMessage(self, activity_tool_path, message):
for key, m in self.getDict(activity_tool).items(): for key, m in self.getDict(activity_tool_path).items():
if m.object_path == message.object_path and m.method_id == message.method_id: if m.object_path == message.object_path and m.method_id == message.method_id:
del self.getDict(activity_tool)[(tuple(m.object_path), m.method_id)] del self.getDict(activity_tool_path)[(tuple(m.object_path), m.method_id)]
def registerActivityBuffer(self, activity_buffer): def registerActivityBuffer(self, activity_buffer):
class_name = self.__class__.__name__ class_name = self.__class__.__name__
...@@ -81,13 +78,14 @@ class RAMDict(Queue): ...@@ -81,13 +78,14 @@ class RAMDict(Queue):
m.is_registered = 1 m.is_registered = 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
if len(self.getDict(activity_tool).keys()) is 0: path = activity_tool.getPhysicalPath()
if len(self.getDict(path).keys()) is 0:
return 1 # Go to sleep return 1 # Go to sleep
for key, m in self.getDict(activity_tool).items(): for key, m in self.getDict(path).items():
if m.validate(self, activity_tool) is VALID: if m.validate(self, activity_tool) is VALID:
activity_tool.invoke(m) activity_tool.invoke(m)
if m.is_executed: if m.is_executed:
del self.getDict(activity_tool)[key] del self.getDict(path)[key]
get_transaction().commit() get_transaction().commit()
return 0 return 0
else: else:
...@@ -101,7 +99,8 @@ class RAMDict(Queue): ...@@ -101,7 +99,8 @@ class RAMDict(Queue):
else: else:
object_path = None object_path = None
active_process = kw.get('active_process', None) active_process = kw.get('active_process', None)
for m in self.getDict(activity_tool).values(): path = activity_tool.getPhysicalPath()
for m in self.getDict(path).values():
# Filter active process and path if defined # Filter active process and path if defined
if active_process is None or m.active_process == active_process: if active_process is None or m.active_process == active_process:
if object_path is None or m.object_path == object_path: if object_path is None or m.object_path == object_path:
...@@ -138,7 +137,8 @@ class RAMDict(Queue): ...@@ -138,7 +137,8 @@ class RAMDict(Queue):
method_dict[m.method_id] = 1 method_dict[m.method_id] = 1
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
# Parse each message in RAM dict # Parse each message in RAM dict
for key, m in self.getDict(activity_tool).items(): path = activity_tool.getPhysicalPath()
for key, m in self.getDict(path).items():
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if not method_dict.has_key(m.method_id): if not method_dict.has_key(m.method_id):
LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path)) LOG('CMFActivity RAMDict: ', 0, 'flushing object %s' % '/'.join(m.object_path))
...@@ -155,7 +155,8 @@ class RAMDict(Queue): ...@@ -155,7 +155,8 @@ class RAMDict(Queue):
def getMessageList(self, activity_tool, processing_node=None,**kw): def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = [] new_queue = []
for m in self.getDict(activity_tool).values(): path = activity_tool.getPhysicalPath()
for m in self.getDict(path).values():
m.processing_node = 1 m.processing_node = 1
m.priority = 0 m.priority = 0
new_queue.append(m) new_queue.append(m)
......
...@@ -44,22 +44,19 @@ class RAMQueue(Queue): ...@@ -44,22 +44,19 @@ class RAMQueue(Queue):
self.queue_dict = {} self.queue_dict = {}
self.last_uid = 0 self.last_uid = 0
def getQueue(self, activity_tool): def getQueue(self, activity_tool_path):
path = activity_tool.getPhysicalPath() return self.queue_dict.setdefault(activity_tool_path, [])
if not self.queue_dict.has_key(path):
self.queue_dict[path] = []
return self.queue_dict[path]
def finishQueueMessage(self, activity_tool, m): def finishQueueMessage(self, activity_tool_path, m):
if m.is_registered: if m.is_registered:
# XXX - Some lock is required on this section # XXX - Some lock is required on this section
self.last_uid = self.last_uid + 1 self.last_uid = self.last_uid + 1
m.uid = self.last_uid m.uid = self.last_uid
self.getQueue(activity_tool).append(m) self.getQueue(activity_tool_path).append(m)
def finishDeleteMessage(self, activity_tool, m): def finishDeleteMessage(self, activity_tool_path, m):
i = 0 i = 0
queue = self.getQueue(activity_tool) queue = self.getQueue(activity_tool_path)
for my_message in queue: for my_message in queue:
if my_message.uid == m.uid: if my_message.uid == m.uid:
del queue[i] del queue[i]
...@@ -67,7 +64,8 @@ class RAMQueue(Queue): ...@@ -67,7 +64,8 @@ class RAMQueue(Queue):
i = i + 1 i = i + 1
def dequeueMessage(self, activity_tool, processing_node): def dequeueMessage(self, activity_tool, processing_node):
for m in self.getQueue(activity_tool): path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
if m.validate(self, activity_tool) is not VALID: if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
get_transaction().commit() # Start a new transaction get_transaction().commit() # Start a new transaction
...@@ -89,7 +87,8 @@ class RAMQueue(Queue): ...@@ -89,7 +87,8 @@ class RAMQueue(Queue):
else: else:
object_path = None object_path = None
active_process = kw.get('active_process', None) active_process = kw.get('active_process', None)
for m in self.getQueue(activity_tool): path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
# Filter active process and path if defined # Filter active process and path if defined
if active_process is None or m.active_process == active_process: if active_process is None or m.active_process == active_process:
if object_path is None or m.object_path == object_path: if object_path is None or m.object_path == object_path:
...@@ -110,7 +109,8 @@ class RAMQueue(Queue): ...@@ -110,7 +109,8 @@ class RAMQueue(Queue):
else: else:
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
# Parse each message in queue # Parse each message in queue
for m in self.getQueue(activity_tool): path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if m.validate(self, activity_tool) is not VALID: if m.validate(self, activity_tool) is not VALID:
self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling) self.deleteMessage(activity_tool, m) # Trash messages which are not validated (no error handling)
...@@ -124,7 +124,8 @@ class RAMQueue(Queue): ...@@ -124,7 +124,8 @@ class RAMQueue(Queue):
def getMessageList(self, activity_tool, processing_node=None,**kw): def getMessageList(self, activity_tool, processing_node=None,**kw):
new_queue = [] new_queue = []
for m in self.getQueue(activity_tool): path = activity_tool.getPhysicalPath()
for m in self.getQueue(path):
m.processing_node = 1 m.processing_node = 1
m.priority = 0 m.priority = 0
new_queue.append(m) new_queue.append(m)
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
from Shared.DC.ZRDB.TM import TM from Shared.DC.ZRDB.TM import TM
from zLOG import LOG, ERROR, INFO from zLOG import LOG, ERROR, INFO
import sys import sys
from thread import allocate_lock, get_ident
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
...@@ -37,19 +38,28 @@ class ActivityBuffer(TM): ...@@ -37,19 +38,28 @@ class ActivityBuffer(TM):
_p_oid=_p_changed=_registered=None _p_oid=_p_changed=_registered=None
def __init__(self, activity_tool=None): def __init__(self, activity_tool=None):
from thread import allocate_lock
self._use_TM = self._transactions = 1 self._use_TM = self._transactions = 1
if self._use_TM: if self._use_TM:
self._tlock = allocate_lock() self._tlock = allocate_lock()
self._tthread = None self._tthread = None
self._lock = allocate_lock() self._lock = allocate_lock()
if activity_tool is not None:
# Directly store the activity tool as an attribute. At the beginning
# the activity tool was stored as a part of the key in queued_activity and
# in flushed_activity, but this is not nice because in that case we must
# use hash on it, and when there is no uid on activity tool, it is
# impossible to generate a new uid because acquisition is not available
# in the dictionary.
assert activity_tool is not None
self._activity_tool = activity_tool self._activity_tool = activity_tool
# Referring to a persistent object is dangerous when finishing a transaction,
# so store only the required information.
self._activity_tool_path = activity_tool.getPhysicalPath()
# Keeps a list of messages to add and remove # Keeps a list of messages to add and remove
# at end of transaction # at end of transaction
def _begin(self, *ignored): def _begin(self, *ignored):
from thread import get_ident
from ActivityTool import activity_list from ActivityTool import activity_list
self._tlock.acquire() self._tlock.acquire()
self._tthread = get_ident() self._tthread = get_ident()
...@@ -73,7 +83,6 @@ class ActivityBuffer(TM): ...@@ -73,7 +83,6 @@ class ActivityBuffer(TM):
raise raise
def _finish(self, *ignored): def _finish(self, *ignored):
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident(): if not self._tlock.locked() or self._tthread != get_ident():
LOG('ActivityBuffer', INFO, "ignoring _finish") LOG('ActivityBuffer', INFO, "ignoring _finish")
return return
...@@ -82,10 +91,10 @@ class ActivityBuffer(TM): ...@@ -82,10 +91,10 @@ class ActivityBuffer(TM):
# Try to push / delete all messages # Try to push / delete all messages
for (activity, message) in self.flushed_activity: for (activity, message) in self.flushed_activity:
#LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id)) #LOG('ActivityBuffer finishDeleteMessage', ERROR, str(message.method_id))
activity.finishDeleteMessage(self._activity_tool, message) activity.finishDeleteMessage(self._activity_tool_path, message)
for (activity, message) in self.queued_activity: for (activity, message) in self.queued_activity:
#LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id)) #LOG('ActivityBuffer finishQueueMessage', ERROR, str(message.method_id))
activity.finishQueueMessage(self._activity_tool, message) activity.finishQueueMessage(self._activity_tool_path, message)
except: except:
LOG('ActivityBuffer', ERROR, "exception during _finish", LOG('ActivityBuffer', ERROR, "exception during _finish",
error=sys.exc_info()) error=sys.exc_info())
...@@ -94,7 +103,6 @@ class ActivityBuffer(TM): ...@@ -94,7 +103,6 @@ class ActivityBuffer(TM):
self._tlock.release() self._tlock.release()
def _abort(self, *ignored): def _abort(self, *ignored):
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident(): if not self._tlock.locked() or self._tthread != get_ident():
LOG('ActivityBuffer', 0, "ignoring _abort") LOG('ActivityBuffer', 0, "ignoring _abort")
return return
...@@ -105,7 +113,6 @@ class ActivityBuffer(TM): ...@@ -105,7 +113,6 @@ class ActivityBuffer(TM):
return return
if not self.requires_prepare: return if not self.requires_prepare: return
self.requires_prepare = 0 self.requires_prepare = 0
from thread import get_ident
if not self._tlock.locked() or self._tthread != get_ident(): if not self._tlock.locked() or self._tthread != get_ident():
LOG('ActivityBuffer', 0, "ignoring tpc_prepare") LOG('ActivityBuffer', 0, "ignoring tpc_prepare")
return return
...@@ -134,14 +141,6 @@ class ActivityBuffer(TM): ...@@ -134,14 +141,6 @@ class ActivityBuffer(TM):
def deferredQueueMessage(self, activity_tool, activity, message): def deferredQueueMessage(self, activity_tool, activity, message):
self._register() self._register()
# Directly store the activity tool as an attribute. At the beginning
# the activity tool was stored as a part of the key in queued_activity and
# in flushed_activity, but this is not nice because in that case we must
# use hash on it, and when there is no uid on activity tool, it is
# impossible to generate a new uid because acquisition is not available
# in the dictionnary.
if getattr(self,'_activity_tool',None) is None:
self._activity_tool = activity_tool
# Activity is called to prevent queuing some messages (useful for example # Activity is called to prevent queuing some messages (useful for example
# to prevent reindexing objects multiple times) # to prevent reindexing objects multiple times)
if not activity.isMessageRegistered(self, activity_tool, message): if not activity.isMessageRegistered(self, activity_tool, message):
...@@ -153,4 +152,3 @@ class ActivityBuffer(TM): ...@@ -153,4 +152,3 @@ class ActivityBuffer(TM):
def deferredDeleteMessage(self, activity_tool, activity, message): def deferredDeleteMessage(self, activity_tool, activity, message):
self._register() self._register()
self.flushed_activity.append((activity, message)) self.flushed_activity.append((activity, message))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment