Commit 563c86c1 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Ensure that SQLDict never leave messages as processed

when a ConflictError occurs.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@8157 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 196a98b1
...@@ -222,8 +222,8 @@ class SQLDict(RAMDict): ...@@ -222,8 +222,8 @@ class SQLDict(RAMDict):
method_id = line.method_id method_id = line.method_id
order_validation_text = line.order_validation_text order_validation_text = line.order_validation_text
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
processing_node = None, to_date = now_date, processing_node = None, to_date = now_date,
order_validation_text = order_validation_text) order_validation_text = order_validation_text)
uid_list = [x.uid for x in uid_list] uid_list = [x.uid for x in uid_list]
uid_list_list = [uid_list] uid_list_list = [uid_list]
priority_list = [line.priority] priority_list = [line.priority]
...@@ -233,20 +233,21 @@ class SQLDict(RAMDict): ...@@ -233,20 +233,21 @@ class SQLDict(RAMDict):
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
m = self.loadMessage(line.message, uid = line.uid)
message_list = [m] # At this point, messages are marked as processed. So catch any kind of exception to make sure
# Validate message (make sure object exists, priority OK, etc.) # that they are unmarked on error.
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): try:
m = self.loadMessage(line.message, uid = line.uid)
message_list = [m]
# Validate message (make sure object exists, priority OK, etc.)
if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
return 0
group_method_id = m.activity_kw.get('group_method_id') group_method_id = m.activity_kw.get('group_method_id')
if group_method_id is not None: if group_method_id is not None:
# Count the number of objects to prevent too many objects. # Count the number of objects to prevent too many objects.
if m.hasExpandMethod(): if m.hasExpandMethod():
try: count = len(m.getObjectList(activity_tool))
count = len(m.getObjectList(activity_tool))
except:
# Here, simply ignore an exception. The same exception should be handled later.
LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
count = 0
else: else:
count = 1 count = 1
...@@ -255,8 +256,8 @@ class SQLDict(RAMDict): ...@@ -255,8 +256,8 @@ class SQLDict(RAMDict):
if count < MAX_GROUPED_OBJECTS: if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method. # Retrieve objects which have the same group method.
result = readMessage(processing_node = processing_node, priority = priority, result = readMessage(processing_node = processing_node, priority = priority,
to_date = now_date, group_method_id = group_method_id, to_date = now_date, group_method_id = group_method_id,
order_validation_text = order_validation_text) order_validation_text = order_validation_text)
#LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result))) #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
for line in result: for line in result:
path = line.path path = line.path
...@@ -269,78 +270,90 @@ class SQLDict(RAMDict): ...@@ -269,78 +270,90 @@ class SQLDict(RAMDict):
# Set selected messages to processing # Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid = uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# Save this newly marked uids as soon as possible.
uid_list_list.append(uid_list)
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod(): if m.hasExpandMethod():
try: count += len(m.getObjectList(activity_tool))
count += len(m.getObjectList(activity_tool))
except:
# Here, simply ignore an exception. The same exception should be handled later.
LOG('SQLDict', 0, 'ignoring an exception from getObjectList', error=sys.exc_info())
pass
else: else:
count += 1 count += 1
message_list.append(m) message_list.append(m)
uid_list_list.append(uid_list)
priority_list.append(line.priority) priority_list.append(line.priority)
if count >= MAX_GROUPED_OBJECTS: if count >= MAX_GROUPED_OBJECTS:
break break
else:
# Release locks before starting a potentially long calculation # If the uids were not valid, remove them from the list, as validateMessage
# unmarked them.
uid_list_list.pop()
# Release locks before starting a potentially long calculation
get_transaction().commit()
except:
# If an exception occurs, abort the transaction to minimize the impact,
# then simply delay the operations.
get_transaction().abort()
for uid_list in uid_list_list:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
retry = 1)
get_transaction().commit() get_transaction().commit()
# Try to invoke return 0
if group_method_id is not None:
LOG('SQLDict', TRACE, # Try to invoke
'invoking a group method %s with %d objects '\ if group_method_id is not None:
' (%d objects in expanded form)' % ( LOG('SQLDict', TRACE,
group_method_id, len(message_list), count)) 'invoking a group method %s with %d objects '\
activity_tool.invokeGroup(group_method_id, message_list) ' (%d objects in expanded form)' % (
else: group_method_id, len(message_list), count))
activity_tool.invoke(message_list[0]) activity_tool.invokeGroup(group_method_id, message_list)
else:
# Check if messages are executed successfully. activity_tool.invoke(message_list[0])
# When some of them are executed successfully, it may not be acceptable to
# abort the transaction, because these remain pending, only due to other # Check if messages are executed successfully.
# invalid messages. This means that a group method should not be used if # When some of them are executed successfully, it may not be acceptable to
# it has a side effect. For now, only indexing uses a group method, and this # abort the transaction, because these remain pending, only due to other
# has no side effect. # invalid messages. This means that a group method should not be used if
for m in message_list: # it has a side effect. For now, only indexing uses a group method, and this
if m.is_executed: # has no side effect.
break for m in message_list:
if m.is_executed:
break
else:
get_transaction().abort()
for i in xrange(len(message_list)):
m = message_list[i]
uid_list = uid_list_list[i]
priority = priority_list[i]
if m.is_executed:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it
get_transaction().commit() # If successful, commit
if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process)
if not active_process.hasActivity():
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else: else:
get_transaction().abort() if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
# If this is a conflict error, do not lower the priority but only delay.
for i in xrange(len(message_list)): activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
m = message_list[i] retry = 1)
uid_list = uid_list_list[i] get_transaction().commit() # Release locks before starting a potentially long calculation
priority = priority_list[i] elif priority > MAX_PRIORITY:
if m.is_executed: # This is an error
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it if len(uid_list) > 0:
get_transaction().commit() # If successful, commit activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
if m.active_process: # Assign message back to 'error' state
active_process = activity_tool.unrestrictedTraverse(m.active_process) m.notifyUser(activity_tool) # Notify Error
if not active_process.hasActivity(): get_transaction().commit() # and commit
# No more activity
m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
else: else:
if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError): # Lower priority
# If this is a conflict error, do not lower the priority but only delay. if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
retry = 1) priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
elif priority > MAX_PRIORITY:
# This is an error
if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
# Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit
else:
# Lower priority
if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY,
priority = priority + 1, retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment