Commit f73c4682 authored by Jean-Paul Smets's avatar Jean-Paul Smets

new tentative to postpone locks

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@359 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 7748f90b
No related merge requests found
...@@ -32,7 +32,7 @@ from RAMDict import RAMDict ...@@ -32,7 +32,7 @@ from RAMDict import RAMDict
from zLOG import LOG from zLOG import LOG
MAX_RETRY = 5 MAX_PRIORITY = 5
DISTRIBUTABLE_STATE = -1 DISTRIBUTABLE_STATE = -1
INVOKE_ERROR_STATE = -2 INVOKE_ERROR_STATE = -2
...@@ -45,6 +45,9 @@ priority_weight = \ ...@@ -45,6 +45,9 @@ priority_weight = \
[4] * 5 + \ [4] * 5 + \
[5] * 1 [5] * 1
class ActivityFlushError(Exception):
"""Error during active message flush"""
class SQLDict(RAMDict): class SQLDict(RAMDict):
""" """
A simple OOBTree based queue. It should be compatible with transactions A simple OOBTree based queue. It should be compatible with transactions
...@@ -73,32 +76,37 @@ class SQLDict(RAMDict): ...@@ -73,32 +76,37 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node) activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
retry = 0 # Make sure object exists
while retry < MAX_RETRY: if not m.validate(self, activity_tool):
if m.validate(self, activity_tool): # We should validate each time XXX in case someone is deleting it at the same time if line.priority > MAX_PRIORITY:
valid = 1 # This is an error
activity_tool.invoke(m) # Try to invoke the message activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = VALIDATE_ERROR_STATE)
if m.is_executed: # Assign message back to 'error' state
retry=MAX_RETRY get_transaction().commit() # and commit
else: else:
get_transaction().abort() # Abort and retry # Lower priority
retry = retry + 1 activity_tool.SQLDict_setPriority(path=path, method_id=method_id, processing_node = processing_node,
priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
valid = 0 # Try to invoke
retry=MAX_RETRY activity_tool.invoke(m) # Try to invoke the message
if valid: # We should validate each time XXX in case someone is deleting it at the same time
if m.is_executed: # Make sure message could be invoked if m.is_executed: # Make sure message could be invoked
activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=processing_node) # Delete it activity_tool.SQLDict_delMessage(path=path, method_id=method_id,
processing_node=processing_node, processing=1) # Delete it
get_transaction().commit() # If successful, commit get_transaction().commit() # If successful, commit
else: else:
get_transaction().abort() # If not, abort transaction and start a new one get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY:
# This is an error
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE) activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = VALIDATE_ERROR_STATE) # Lower priority
# Assign message back to 'error' state activity_tool.SQLDict_setPriority(path=path, method_id=method_id, processing_node = processing_node,
get_transaction().commit() # and commit priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 1 return 1
...@@ -119,13 +127,16 @@ class SQLDict(RAMDict): ...@@ -119,13 +127,16 @@ class SQLDict(RAMDict):
- if we do not commit, then we can use flush in a larger transaction - if we do not commit, then we can use flush in a larger transaction
commit should in general not be used commit should in general not be used
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
""" """
path = '/'.join(object_path) path = '/'.join(object_path)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
if invoke:
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
if commit: get_transaction().commit() # Release locks before starting a potentially long calculation if commit: get_transaction().commit() # Release locks before starting a potentially long calculation
method_dict = {} method_dict = {}
if invoke: # Parse each message
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
...@@ -133,26 +144,20 @@ class SQLDict(RAMDict): ...@@ -133,26 +144,20 @@ class SQLDict(RAMDict):
# Only invoke once (it would be different for a queue) # Only invoke once (it would be different for a queue)
method_dict[method_id] = 1 method_dict[method_id] = 1
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
retry = 0 # First Validate
while retry < MAX_RETRY: if m.validate(self, activity_tool):
if m.validate(self, activity_tool): # We should validate each time XXX in case someone is deleting it at the same time
valid = 1
activity_tool.invoke(m) # Try to invoke the message activity_tool.invoke(m) # Try to invoke the message
if m.is_executed: if not m.is_executed: # Make sure message could be invoked
retry=MAX_RETRY
else:
get_transaction().abort() # Abort and retry
retry = retry + 1
else:
valid = 0
retry=MAX_RETRY
if valid: # We should validate each time XXX in case someone is deleting it at the same time
if m.is_executed: # Make sure message could be invoked
activity_tool.SQLDict_delMessage(path=path, method_id=method_id, processing_node=None) # Delete it
if commit: get_transaction().commit() # If successful, commit
else:
if commit: get_transaction().abort() # If not, abort transaction and start a new one if commit: get_transaction().abort() # If not, abort transaction and start a new one
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path))
else: else:
if commit: get_transaction().abort() # If not, abort transaction and start a new one
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
# Erase all messages in a single transaction
activity_tool.SQLDict_delMessage(path=path, method_id=method_id) # Delete all activity_tool.SQLDict_delMessage(path=path, method_id=method_id) # Delete all
if commit: get_transaction().commit() # Commit flush if commit: get_transaction().commit() # Commit flush
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment