Commit 78385659 authored by Jean-Paul Smets's avatar Jean-Paul Smets

SQL uses uid

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@385 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 46be595d
...@@ -67,44 +67,52 @@ class SQLDict(RAMDict): ...@@ -67,44 +67,52 @@ class SQLDict(RAMDict):
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority) result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
if len(result) == 0: if len(result) == 0:
# If empty, take any message # If empty, take any message
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=None) priority = None
result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority)
if len(result) > 0: if len(result) > 0:
line = result[0] line = result[0]
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None )
uid_list = map(lambda x:x.uid, uid_list)
# Make sure message can not be processed anylonger # Make sure message can not be processed anylonger
activity_tool.SQLDict_processMessage(path=path, method_id=method_id, processing_node = processing_node) if len(uid_list) > 0:
activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
# Make sure object exists # Make sure object exists
if not m.validate(self, activity_tool): if not m.validate(self, activity_tool):
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = VALIDATE_ERROR_STATE) if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
activity_tool.SQLDict_setPriority(path=path, method_id=method_id, processing_node = processing_node, if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list,
priority = line.priority + 1) priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
else: else:
# Try to invoke # Try to invoke
activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ? activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
if m.is_executed: # Make sure message could be invoked if m.is_executed: # Make sure message could be invoked
activity_tool.SQLDict_delMessage(path=path, method_id=method_id, if len(uid_list) > 0:
processing_node=processing_node, processing=1) # Delete it activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit get_transaction().commit() # If successful, commit
else: else:
get_transaction().abort() # If not, abort transaction and start a new one get_transaction().abort() # If not, abort transaction and start a new one
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error # This is an error
activity_tool.SQLDict_assignMessage(path=path, method_id=method_id, processing_node = INVOKE_ERROR_STATE) if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
activity_tool.SQLDict_setPriority(path=path, method_id=method_id, processing_node = processing_node, if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list,
priority = line.priority + 1) priority = line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
...@@ -131,6 +139,8 @@ class SQLDict(RAMDict): ...@@ -131,6 +139,8 @@ class SQLDict(RAMDict):
NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
""" """
path = '/'.join(object_path) path = '/'.join(object_path)
uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,processing_node=None)
uid_list = map(lambda x:x.uid, uid_list)
# LOG('Flush', 0, str((path, invoke, method_id))) # LOG('Flush', 0, str((path, invoke, method_id)))
if invoke: if invoke:
result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None) result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
...@@ -155,7 +165,8 @@ class SQLDict(RAMDict): ...@@ -155,7 +165,8 @@ class SQLDict(RAMDict):
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
# Erase all messages in a single transaction # Erase all messages in a single transaction
activity_tool.SQLDict_delMessage(path=path, method_id=method_id) # Delete all "old" messages (not -1 processing) if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete all "old" messages (not -1 processing)
def getMessageList(self, activity_tool, processing_node=None): def getMessageList(self, activity_tool, processing_node=None):
message_list = [] message_list = []
...@@ -177,7 +188,7 @@ class SQLDict(RAMDict): ...@@ -177,7 +188,7 @@ class SQLDict(RAMDict):
if not path_dict.has_key(path): if not path_dict.has_key(path):
# Only assign once (it would be different for a queue) # Only assign once (it would be different for a queue)
path_dict[path] = 1 path_dict[path] = 1
activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node) activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None)
get_transaction().commit() # Release locks immediately to allow processing of messages get_transaction().commit() # Release locks immediately to allow processing of messages
processing_node = processing_node + 1 processing_node = processing_node + 1
if processing_node > node_count: if processing_node > node_count:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment