Commit 1c300e5c authored by Yoshinori Okuji's avatar Yoshinori Okuji

This big change optimizes the scheduling of active objects,

and fix some bugs.

The basic idea is to track a dependency graph to find executable
messages quickly. This makes the activity system far more efficient,
when you have many inter-dependent messages queued in the tables.

Also, this obsoletes the time shifting in the schedulers,
as executable messages can be found in a more efficient manner.
So the activity parameter "at_date" should work expectedly.

Now the API of validate methods in Activities return a
list of message objects instead of a boolean value. Such
a list contains messages that are depended upon by a given
message.

The validate method in Message accepts a new optional
parameter, check_order_validation, to indicate whether
order validation should be performed. The default behavior
has not changed.

getDependentMessageList is added to ActivityTool, Queue
and Message. This method collects dependent message for
a given message from all activities.

There are some other subtle changes. Look at the diffs for
more details.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@14039 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent df5ccf5a
...@@ -26,12 +26,17 @@ ...@@ -26,12 +26,17 @@
# #
############################################################################## ##############################################################################
import pickle, sys import cPickle, sys
from Acquisition import aq_base
from DateTime import DateTime from DateTime import DateTime
from Products.CMFActivity.ActivityTool import Message from zLOG import LOG, WARNING, ERROR
from zLOG import LOG
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
import sha
from cStringIO import StringIO
try:
from transaction import get as get_transaction
except ImportError:
pass
# Error values for message validation # Error values for message validation
EXCEPTION = -1 EXCEPTION = -1
...@@ -122,7 +127,7 @@ class Queue: ...@@ -122,7 +127,7 @@ class Queue:
self.is_awake[processing_node] = 0 self.is_awake[processing_node] = 0
self.is_alive[processing_node] = 0 self.is_alive[processing_node] = 0
def validate(self, activity_tool, message, **kw): def validate(self, activity_tool, message, check_order_validation=1, **kw):
""" """
This is the place where activity semantics is implemented This is the place where activity semantics is implemented
**kw contains all parameters which allow to implement synchronisation, **kw contains all parameters which allow to implement synchronisation,
...@@ -145,22 +150,79 @@ class Queue: ...@@ -145,22 +150,79 @@ class Queue:
try: try:
if activity_tool.unrestrictedTraverse(message.object_path, None) is None: if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
# Do not try to call methods on objects which do not exist # Do not try to call methods on objects which do not exist
LOG('WARNING ActivityTool', 0, LOG('CMFActivity', WARNING,
'Object %s does not exist' % '/'.join(message.object_path)) 'Object %s does not exist' % '/'.join(message.object_path))
return INVALID_PATH return INVALID_PATH
for k, v in kw.items(): if check_order_validation:
for k, v in kw.iteritems():
if activity_tool.validateOrder(message, k, v): if activity_tool.validateOrder(message, k, v):
return INVALID_ORDER return INVALID_ORDER
except ConflictError: except ConflictError:
raise raise
except: except:
LOG('WARNING ActivityTool', 0, LOG('CMFActivity', WARNING,
'Validation of Object %s raised exception' % '/'.join(message.object_path), 'Validation of Object %s raised exception' % '/'.join(message.object_path),
error=sys.exc_info()) error=sys.exc_info())
# Do not try to call methods on objects which cause errors # Do not try to call methods on objects which cause errors
return EXCEPTION return EXCEPTION
return VALID return VALID
def getDependentMessageList(self, activity_tool, message, **kw):
message_list = []
for k, v in kw.iteritems():
result = activity_tool.getDependentMessageList(message, k, v)
if result:
message_list.extend(result)
return message_list
def getExecutableMessageList(self, activity_tool, message, message_dict,
validation_text_dict):
"""Get messages which have no dependent message, and store them in the dictionary.
If the passed message itself is executable, simply store only that message.
Otherwise, try to find at least one message executable from dependent messages.
This may result in no new message, if all dependent messages are already present
in the dictionary, if all dependent messages are in different activities, or if
the message has a circular dependency.
The validation text dictionary is used only to cache the results of validations,
in order to reduce the number of SQL queries.
"""
if message.uid in message_dict:
# Nothing to do. But detect a circular dependency.
if message_dict[message.uid] is None:
LOG('CMFActivity', ERROR,
'message uid %r has a circular dependency' % (message.uid,))
return
cached_result = validation_text_dict.get(message.order_validation_text)
if cached_result is None:
message_list = message.getDependentMessageList(self, activity_tool)
get_transaction().commit() # Release locks.
if message_list:
# The result is not empty, so this message is not executable.
validation_text_dict[message.order_validation_text] = 0
now_date = DateTime()
for activity, m in message_list:
# Note that the messages may contain ones which are already assigned or not
# executable yet.
if activity is self and m.processing_node == -1 and m.date <= now_date:
# Call recursively. Set None as a marker to detect a circular dependency.
message_dict[message.uid] = None
try:
self.getExecutableMessageList(activity_tool, m, message_dict,
validation_text_dict)
finally:
del message_dict[message.uid]
else:
validation_text_dict[message.order_validation_text] = 1
message_dict[message.uid] = message
elif cached_result:
message_dict[message.uid] = message
else:
pass
def isAwake(self, activity_tool, processing_node): def isAwake(self, activity_tool, processing_node):
return self.is_awake[processing_node] return self.is_awake[processing_node]
...@@ -179,12 +241,30 @@ class Queue: ...@@ -179,12 +241,30 @@ class Queue:
pass pass
def loadMessage(self, s, **kw): def loadMessage(self, s, **kw):
m = pickle.loads(s) m = cPickle.load(StringIO(s))
m.__dict__.update(kw) m.__dict__.update(kw)
return m return m
def dumpMessage(self, m): def dumpMessage(self, m):
return pickle.dumps(m) return cPickle.dumps(m)
def getOrderValidationText(self, message):
# Return an identifier of validators related to ordering.
order_validation_item_list = []
key_list = message.activity_kw.keys()
key_list.sort()
for key in key_list:
method_id = "_validate_%s" % key
if hasattr(self, method_id):
order_validation_item_list.append((key, message.activity_kw[key]))
if len(order_validation_item_list) == 0:
# When no order validation argument is specified, skip the computation
# of the checksum for speed. Here, 'none' is used, because this never be
# identical to SHA1 hexdigest (which is always 40 characters), and 'none'
# is true in Python. This is important, because dtml-if assumes that an empty
# string is false, so we must use a non-empty string for this.
return 'none'
return sha.new(repr(order_validation_item_list)).hexdigest()
def getMessageList(self, activity_tool, processing_node=None,**kw): def getMessageList(self, activity_tool, processing_node=None,**kw):
return [] return []
......
...@@ -27,8 +27,8 @@ ...@@ -27,8 +27,8 @@
############################################################################## ##############################################################################
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Products.CMFActivity.Errors import ActivityFlushError
from Queue import Queue, VALID from Queue import Queue, VALID
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from zLOG import LOG from zLOG import LOG
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import Queue, VALID from Queue import Queue, VALID
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
......
...@@ -26,17 +26,15 @@ ...@@ -26,17 +26,15 @@
# #
############################################################################## ##############################################################################
import random
from DateTime import DateTime from DateTime import DateTime
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
from RAMDict import RAMDict from RAMDict import RAMDict
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
import sys import sys
import sha from types import ClassType
from types import ClassType, StringType, ListType, TupleType
try: try:
from transaction import get as get_transaction from transaction import get as get_transaction
...@@ -141,50 +139,22 @@ class SQLDict(RAMDict): ...@@ -141,50 +139,22 @@ class SQLDict(RAMDict):
message_list = activity_buffer.getMessageList(self) message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered] return [m for m in message_list if m.is_registered]
def getOrderValidationText(self, message):
# Return an identifier of validators related to ordering.
order_validation_item_list = []
key_list = message.activity_kw.keys()
key_list.sort()
for key in key_list:
method_id = "_validate_%s" % key
if hasattr(self, method_id):
order_validation_item_list.append((key, message.activity_kw[key]))
if len(order_validation_item_list) == 0:
# When no order validation argument is specified, skip the computation
# of the checksum for speed. Here, 'none' is used, because this never be
# identical to SHA1 hexdigest (which is always 40 characters), and 'none'
# is true in Python. This is important, because dtml-if assumes that an empty
# string is false, so we must use a non-empty string for this.
return 'none'
return sha.new(repr(order_validation_item_list)).hexdigest()
def validateMessage(self, activity_tool, message, uid_list, priority, processing_node): def validateMessage(self, activity_tool, message, uid_list, priority, processing_node):
validation_state = message.validate(self, activity_tool) validation_state = message.validate(self, activity_tool, check_order_validation=0)
if validation_state is not VALID: if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH):
# There is a serious validation error - we must lower priority # There is a serious validation error - we must lower priority
if priority > MAX_PRIORITY: if priority > MAX_PRIORITY:
# This is an error # This is an error
if len(uid_list) > 0: if len(uid_list) > 0:
#LOG('SQLDict', 0, 'error uid_list = %r' % (uid_list,)) activity_tool.SQLDict_assignMessage(uid=uid_list, processing_node=VALIDATE_ERROR_STATE)
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
#m.notifyUser(activity_tool) # Notify Error #m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
#LOG('SQLDict', 0, 'lower priority uid_list = %r' % (uid_list,))
# Lower priority # Lower priority
if len(uid_list) > 0: # Add some delay before new processing if len(uid_list) > 0: # Add some delay before new processing
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
priority = priority + 1, retry = 1) priority=priority + 1, retry=1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution
#order_validation_text = self.getOrderValidationText(message)
activity_tool.SQLDict_setPriority(uid = uid_list,
delay = VALIDATION_ERROR_DELAY,
retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
return 1 return 1
...@@ -196,41 +166,29 @@ class SQLDict(RAMDict): ...@@ -196,41 +166,29 @@ class SQLDict(RAMDict):
return 1 return 1
now_date = DateTime() now_date = DateTime()
priority = random.choice(priority_weight) result = readMessage(processing_node=processing_node, to_date=now_date)
# Try to find a message at given priority level which is scheduled for now if len(result) > 0:
result = readMessage(processing_node=processing_node, priority=priority,
to_date=now_date)
if len(result) == 0:
# If empty, take any message which is scheduled for now
priority = None
result = readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
if len(result) == 0:
# If the result is still empty, shift the dates so that SQLDict can dispatch pending active
# objects quickly.
self.timeShift(activity_tool, VALIDATION_ERROR_DELAY, processing_node,retry=1)
else:
#LOG('SQLDict dequeueMessage', 100, 'result = %r' % (list(result)))
line = result[0] line = result[0]
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
order_validation_text = line.order_validation_text order_validation_text = line.order_validation_text
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
processing_node = None, to_date = now_date, processing_node=None, to_date=now_date,
order_validation_text = order_validation_text) order_validation_text=order_validation_text)
uid_list = [x.uid for x in uid_list] uid_list = [x.uid for x in uid_list]
uid_list_list = [uid_list] uid_list_list = [uid_list]
priority_list = [line.priority] priority_list = [line.priority]
# Make sure message can not be processed anylonger # Make sure message can not be processed anylonger
if len(uid_list) > 0: if len(uid_list) > 0:
# Set selected messages to processing # Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid=uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
# At this point, messages are marked as processed. So catch any kind of exception to make sure # At this point, messages are marked as processed. So catch any kind of exception to make sure
# that they are unmarked on error. # that they are unmarked on error.
try: try:
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid=line.uid)
message_list = [m] message_list = [m]
# Validate message (make sure object exists, priority OK, etc.) # Validate message (make sure object exists, priority OK, etc.)
if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): if not self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
...@@ -244,13 +202,13 @@ class SQLDict(RAMDict): ...@@ -244,13 +202,13 @@ class SQLDict(RAMDict):
else: else:
count = 1 count = 1
group_method = activity_tool.restrictedTraverse(group_method_id) group_method = activity_tool.getPortalObject().restrictedTraverse(group_method_id)
if count < MAX_GROUPED_OBJECTS: if count < MAX_GROUPED_OBJECTS:
# Retrieve objects which have the same group method. # Retrieve objects which have the same group method.
result = readMessage(processing_node = processing_node, priority = priority, result = readMessage(processing_node=processing_node,
to_date = now_date, group_method_id = group_method_id, to_date=now_date, group_method_id=group_method_id,
order_validation_text = order_validation_text) order_validation_text=order_validation_text)
#LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result))) #LOG('SQLDict dequeueMessage', 0, 'result = %d' % (len(result)))
path_and_method_id_dict = {} path_and_method_id_dict = {}
for line in result: for line in result:
...@@ -263,19 +221,20 @@ class SQLDict(RAMDict): ...@@ -263,19 +221,20 @@ class SQLDict(RAMDict):
continue continue
path_and_method_id_dict[key] = 1 path_and_method_id_dict[key] = 1
uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id, uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=method_id,
processing_node = None, to_date = now_date, processing_node=None,
order_validation_text = order_validation_text) to_date=now_date,
order_validation_text=order_validation_text)
uid_list = [x.uid for x in uid_list] uid_list = [x.uid for x in uid_list]
if len(uid_list) > 0: if len(uid_list) > 0:
# Set selected messages to processing # Set selected messages to processing
activity_tool.SQLDict_processMessage(uid = uid_list) activity_tool.SQLDict_processMessage(uid=uid_list)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
# Save this newly marked uids as soon as possible. # Save this newly marked uids as soon as possible.
uid_list_list.append(uid_list) uid_list_list.append(uid_list)
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid=line.uid)
if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node): if self.validateMessage(activity_tool, m, uid_list, line.priority, processing_node):
if m.hasExpandMethod(): if m.hasExpandMethod():
count += len(m.getObjectList(activity_tool)) count += len(m.getObjectList(activity_tool))
...@@ -321,7 +280,8 @@ class SQLDict(RAMDict): ...@@ -321,7 +280,8 @@ class SQLDict(RAMDict):
get_transaction().abort() get_transaction().abort()
except: except:
# Unfortunately, database adapters may raise an exception against abort. # Unfortunately, database adapters may raise an exception against abort.
LOG('SQLDict', WARNING, 'abort failed, thus some objects may be modified accidentally') LOG('SQLDict', WARNING,
'abort failed, thus some objects may be modified accidentally')
pass pass
# An exception happens at somewhere else but invoke or invokeGroup, so messages # An exception happens at somewhere else but invoke or invokeGroup, so messages
...@@ -330,10 +290,11 @@ class SQLDict(RAMDict): ...@@ -330,10 +290,11 @@ class SQLDict(RAMDict):
for uid_list in uid_list_list: for uid_list in uid_list_list:
if len(uid_list): if len(uid_list):
# This only sets processing to zero. # This only sets processing to zero.
activity_tool.SQLDict_setPriority(uid = uid_list) activity_tool.SQLDict_setPriority(uid=uid_list)
get_transaction().commit() get_transaction().commit()
except: except:
LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception', LOG('SQLDict', ERROR,
'SQLDict.dequeueMessage raised, and cannot even set processing to zero due to an exception',
error=sys.exc_info()) error=sys.exc_info())
raise raise
return 0 return 0
...@@ -345,7 +306,7 @@ class SQLDict(RAMDict): ...@@ -345,7 +306,7 @@ class SQLDict(RAMDict):
priority = priority_list[i] priority = priority_list[i]
if m.is_executed: if m.is_executed:
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_delMessage(uid = uid_list) # Delete it activity_tool.SQLDict_delMessage(uid=uid_list) # Delete it
get_transaction().commit() # If successful, commit get_transaction().commit() # If successful, commit
if m.active_process: if m.active_process:
active_process = activity_tool.unrestrictedTraverse(m.active_process) active_process = activity_tool.unrestrictedTraverse(m.active_process)
...@@ -355,24 +316,25 @@ class SQLDict(RAMDict): ...@@ -355,24 +316,25 @@ class SQLDict(RAMDict):
else: else:
if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError): if type(m.exc_type) is ClassType and issubclass(m.exc_type, ConflictError):
# If this is a conflict error, do not lower the priority but only delay. # If this is a conflict error, do not lower the priority but only delay.
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY)
retry = 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
elif priority > MAX_PRIORITY: elif priority > MAX_PRIORITY:
# This is an error # This is an error
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE) activity_tool.SQLDict_assignMessage(uid=uid_list,
processing_node=INVOKE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error m.notifyUser(activity_tool) # Notify Error
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
if len(uid_list) > 0: if len(uid_list) > 0:
activity_tool.SQLDict_setPriority(uid = uid_list, delay = VALIDATION_ERROR_DELAY, activity_tool.SQLDict_setPriority(uid=uid_list, delay=VALIDATION_ERROR_DELAY,
priority = priority + 1, retry = 1) priority=priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
except: except:
LOG('SQLDict', ERROR, 'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages', LOG('SQLDict', ERROR,
'SQLDict.dequeueMessage raised an exception during checking for the results of processed messages',
error=sys.exc_info()) error=sys.exc_info())
raise raise
...@@ -411,7 +373,7 @@ class SQLDict(RAMDict): ...@@ -411,7 +373,7 @@ class SQLDict(RAMDict):
if readMessageList is not None: if readMessageList is not None:
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id): if m.object_path == object_path and (method_id is None or method_id == m.method_id):
#if not method_dict.has_key(method_id or m.method_id): #if not method_dict.has_key(method_id or m.method_id):
if not method_dict.has_key(m.method_id): if not method_dict.has_key(m.method_id):
method_dict[m.method_id] = 1 # Prevents calling invoke twice method_dict[m.method_id] = 1 # Prevents calling invoke twice
...@@ -469,13 +431,13 @@ class SQLDict(RAMDict): ...@@ -469,13 +431,13 @@ class SQLDict(RAMDict):
if len(uid_list)>0: if len(uid_list)>0:
activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list]) activity_tool.SQLDict_delMessage(uid = [x.uid for x in uid_list])
def getMessageList(self, activity_tool, processing_node=None,include_processing=0,**kw): def getMessageList(self, activity_tool, processing_node=None, include_processing=0, **kw):
# YO: reading all lines might cause a deadlock # YO: reading all lines might cause a deadlock
message_list = [] message_list = []
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None) readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None: if readMessageList is not None:
result = readMessageList(path=None, method_id=None, processing_node=None, result = readMessageList(path=None, method_id=None, processing_node=None,
to_processing_date=None,include_processing=include_processing) to_processing_date=None, include_processing=include_processing)
for line in result: for line in result:
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
m.processing_node = line.processing_node m.processing_node = line.processing_node
...@@ -496,145 +458,155 @@ class SQLDict(RAMDict): ...@@ -496,145 +458,155 @@ class SQLDict(RAMDict):
return message_list return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
processing_node = 1
readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None) readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
if readMessageList is not None: if readMessageList is not None:
now_date = DateTime() now_date = DateTime()
if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME: result = readMessageList(path=None, method_id=None, processing_node=-1,
# Sticky processing messages should be set back to non processing to_date=now_date, include_processing=0)
max_processing_date = now_date - MAX_PROCESSING_TIME get_transaction().commit()
self.max_processing_date = now_date
else: validation_text_dict = {'none': 1}
max_processing_date = None message_dict = {}
result = readMessageList(path=None, method_id=None, processing_node = -1,
to_processing_date = max_processing_date,
include_processing=0) # Only assign non assigned messages
get_transaction().commit() # Release locks before starting a potentially long calculation
path_dict = {}
for line in result: for line in result:
path = line.path message = self.loadMessage(line.message, uid = line.uid,
broadcast = line.broadcast order_validation_text = line.order_validation_text)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict)
# XXX probably this below can be optimized by assigning multiple messages at a time.
path_dict = {}
assignMessage = activity_tool.SQLDict_assignMessage
processing_node = 1
for message in message_dict.itervalues():
path = '/'.join(message.object_path)
broadcast = message.activity_kw.get('broadcast', 0)
if broadcast: if broadcast:
# Broadcast messages must be distributed into all nodes. # Broadcast messages must be distributed into all nodes.
uid = line.uid uid = message.uid
activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid]) assignMessage(processing_node=1, uid=[uid])
if node_count > 1: if node_count > 1:
uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity', id_count=node_count - 1) id_tool = activity_tool.getPortalObject().portal_ids
for node in range(2, node_count+1): uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity',
activity_tool.SQLDict_writeMessage( uid = uid_list.pop(), id_count=node_count - 1)
path = path, path_list = [path] * (node_count - 1)
method_id = line.method_id, method_id_list = [message.method_id] * (node_count - 1)
priority = line.priority, priority_list = [message.activity_kw.get('priority', 1)] * (node_count - 1)
broadcast = 1, processing_node_list = range(2, node_count + 1)
processing_node = node, broadcast_list = [1] * (node_count - 1)
message = line.message, message_list = [self.dumpMessage(message)] * (node_count - 1)
date = line.date) date_list = [message.activity_kw.get('at_date', now_date)] * (node_count - 1)
elif not path_dict.has_key(path): group_method_id_list = [message.activity_kw.get('group_method_id', '')] * (node_count - 1)
# Only assign once (it would be different for a queue) tag_list = [message.activity_kw.get('tag', '')] * (node_count - 1)
path_dict[path] = 1 order_validation_text_list = [message.order_validation_text] * (node_count - 1)
activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0) activity_tool.SQLDict_writeMessageList(uid_list=uid_list,
get_transaction().commit() # Release locks immediately to allow processing of messages path_list=path_list,
processing_node = processing_node + 1 method_id_list=method_id_list,
priority_list=priority_list,
broadcast_list=broadcast_list,
processing_node_list=processing_node_list,
message_list=message_list,
date_list=date_list,
group_method_id_list=group_method_id_list,
tag_list=tag_list,
order_validation_text_list=order_validation_text_list)
get_transaction().commit()
else:
# Select a processing node. If the same path appears again, dispatch the message to
# the same node, so that object caching is more efficient. Otherwise, apply a round
# robin scheduling.
node = path_dict.get(path)
if node is None:
node = processing_node
path_dict[path] = node
processing_node += 1
if processing_node > node_count: if processing_node > node_count:
processing_node = 1 # Round robin processing_node = 1
assignMessage(processing_node=node, uid=[message.uid], broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
if isinstance(method_id, str):
method_id = [method_id]
if isinstance(path, str):
path = [path]
if isinstance(tag, str):
tag = [tag]
if method_id or message_uid or path or tag:
validateMessageList = activity_tool.SQLDict_validateMessageList
result = validateMessageList(method_id=method_id,
message_uid=message_uid,
path=path,
tag=tag)
message_list = []
for line in result:
m = self.loadMessage(line.message,
uid=line.uid,
order_validation_text=line.order_validation_text,
date=line.date,
processing_node=line.processing_node)
message_list.append(m)
return message_list
else:
return []
def _validate_after_method_id(self, activity_tool, message, value): def _validate_after_method_id(self, activity_tool, message, value):
# Count number of occurances of method_id return self._validate(activity_tool, method_id=value)
if type(value) is StringType:
value = [value]
if len(value)>0: # if empty list provided, the message is valid
result = activity_tool.SQLDict_validateMessageList(method_id=value, message_uid=None, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_path(self, activity_tool, message, value): def _validate_after_path(self, activity_tool, message, value):
# Count number of occurances of path return self._validate(activity_tool, path=value)
if type(value) is StringType:
value = [value]
if len(value)>0: # if empty list provided, the message is valid
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_message_uid(self, activity_tool, message, value): def _validate_after_message_uid(self, activity_tool, message, value):
# Count number of occurances of message_uid return self._validate(activity_tool, message_uid=value)
result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_path_and_method_id(self, activity_tool, message, value): def _validate_after_path_and_method_id(self, activity_tool, message, value):
# Count number of occurances of path and method_id if not isinstance(value, (tuple, list)) or len(value) < 2:
if (type(value) != TupleType and type(value) != ListType) or len(value)<2: LOG('CMFActivity', WARNING,
LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method_id : %s' % repr(value)) 'unable to recognize value for after_path_and_method_id: %r' % (value,))
return VALID return []
path = value[0] return self._validate(activity_tool, path=value[0], method_id=value[1])
method = value[1]
if type(path) is StringType:
path = [path]
if type(method) is StringType:
method = [method]
result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, path=path)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_tag(self, activity_tool, message, value): def _validate_after_tag(self, activity_tool, message, value):
# Count number of occurances of tag return self._validate(activity_tool, tag=value)
if self.countMessageWithTag(activity_tool, value) > 0:
return INVALID_ORDER
return VALID
def countMessage(self, activity_tool, tag=None,path=None, def _validate_after_tag_and_method_id(self, activity_tool, message, value):
method_id=None,message_uid=None,**kw): # Count number of occurances of tag and method_id
""" if not isinstance(value, (tuple, list)) or len(value) < 2:
Return the number of message which match the given parameter. LOG('CMFActivity', WARNING,
'unable to recognize value for after_tag_and_method_id: %r' % (value,))
return []
return self._validate(activity_tool, tag=value[0], method_id=value[1])
def countMessage(self, activity_tool, tag=None, path=None,
method_id=None, message_uid=None, **kw):
"""Return the number of messages which match the given parameters.
""" """
if isinstance(tag, StringType): if isinstance(tag, str):
tag = [tag] tag = [tag]
if isinstance(path, StringType): if isinstance(path, str):
path = [path] path = [path]
if isinstance(message_uid, (int,long)): elif isinstance(method_id, str):
message_uid = [message_uid]
if isinstance(method_id, StringType):
method_id = [method_id] method_id = [method_id]
result = activity_tool.SQLDict_validateMessageList(method_id=method_id, result = activity_tool.SQLDict_validateMessageList(method_id=method_id,
path=path, path=path,
message_uid=message_uid, message_uid=message_uid,
tag=tag) tag=tag,
count=1)
return result[0].uid_count return result[0].uid_count
def countMessageWithTag(self, activity_tool, value): def countMessageWithTag(self, activity_tool, value):
"""Return the number of messages which match the given tag.
""" """
Return the number of message which match the given tag. return self.countMessage(activity_tool, tag=value)
"""
return self.countMessage(activity_tool,tag=value)
def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id
if (type(value) != TupleType and type(value) != ListType) or len(value)<2:
LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value))
return VALID
tag = value[0]
method = value[1]
if type(tag) is StringType:
tag = [tag]
if type(method) is StringType:
method = [method]
result = activity_tool.SQLDict_validateMessageList(method_id=method, message_uid=None, tag=tag)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node=None,retry=None): def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
""" """
To simulate timeShift, we simply substract delay from To simulate timeShift, we simply substract delay from
all dates in SQLDict message table all dates in SQLDict message table
""" """
activity_tool.SQLDict_timeShift(delay = delay, processing_node = processing_node,retry=retry) activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
registerActivity(SQLDict) registerActivity(SQLDict)
...@@ -26,15 +26,14 @@ ...@@ -26,15 +26,14 @@
# #
############################################################################## ##############################################################################
import random
from Products.CMFActivity.ActivityTool import registerActivity from Products.CMFActivity.ActivityTool import registerActivity
from RAMQueue import RAMQueue from RAMQueue import RAMQueue
from DateTime import DateTime from DateTime import DateTime
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY from Queue import VALID, INVALID_PATH, VALIDATION_ERROR_DELAY
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE from Products.CMFActivity.ActiveObject import INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
from Products.CMFActivity.Errors import ActivityFlushError from Products.CMFActivity.Errors import ActivityFlushError
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from types import StringType, ClassType from types import ClassType
import sys import sys
try: try:
...@@ -61,8 +60,9 @@ class SQLQueue(RAMQueue): ...@@ -61,8 +60,9 @@ class SQLQueue(RAMQueue):
""" """
def prepareQueueMessage(self, activity_tool, m): def prepareQueueMessage(self, activity_tool, m):
if m.is_registered: if m.is_registered:
#import pdb; pdb.set_trace() id_tool = activity_tool.getPortalObject().portal_ids
activity_tool.SQLQueue_writeMessage(uid = activity_tool.getPortalObject().portal_ids.generateNewLengthId(id_group='portal_activity_queue'), uid = id_tool.generateNewLengthId(id_group='portal_activity_queue')
activity_tool.SQLQueue_writeMessage(uid = uid,
path = '/'.join(m.object_path) , path = '/'.join(m.object_path) ,
method_id = m.method_id, method_id = m.method_id,
priority = m.activity_kw.get('priority', 1), priority = m.activity_kw.get('priority', 1),
...@@ -84,13 +84,7 @@ class SQLQueue(RAMQueue): ...@@ -84,13 +84,7 @@ class SQLQueue(RAMQueue):
now_date = DateTime() now_date = DateTime()
# Next processing date in case of error # Next processing date in case of error
next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400 next_processing_date = now_date + float(VALIDATION_ERROR_DELAY)/86400
priority = random.choice(priority_weight) result = readMessage(processing_node=processing_node, to_date=now_date)
# Try to find a message at given priority level
result = readMessage(processing_node=processing_node, priority=priority,
to_date=now_date)
if len(result) == 0:
# If empty, take any message
result = readMessage(processing_node=processing_node, priority=None,to_date=now_date)
if len(result) > 0: if len(result) > 0:
line = result[0] line = result[0]
path = line.path path = line.path
...@@ -103,23 +97,17 @@ class SQLQueue(RAMQueue): ...@@ -103,23 +97,17 @@ class SQLQueue(RAMQueue):
try: try:
m = self.loadMessage(line.message) m = self.loadMessage(line.message)
# Make sure object exists # Make sure object exists
validation_state = m.validate(self, activity_tool) validation_state = m.validate(self, activity_tool, check_order_validation=0)
if validation_state is not VALID: if validation_state is not VALID:
if validation_state in (EXCEPTION, INVALID_PATH):
if line.priority > MAX_PRIORITY: if line.priority > MAX_PRIORITY:
# This is an error. # This is an error.
# Assign message back to 'error' state. # Assign message back to 'error' state.
activity_tool.SQLQueue_assignMessage(uid=line.uid, activity_tool.SQLQueue_assignMessage(uid=line.uid,
processing_node = VALIDATE_ERROR_STATE) processing_node=VALIDATE_ERROR_STATE)
get_transaction().commit() # and commit get_transaction().commit() # and commit
else: else:
# Lower priority # Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, priority = line.priority + 1) activity_tool.SQLQueue_setPriority(uid=line.uid, priority=line.priority + 1)
get_transaction().commit() # Release locks before starting a potentially long calculation
else:
# We do not lower priority for INVALID_ORDER errors but we do postpone execution
activity_tool.SQLQueue_setPriority(uid = line.uid, date = next_processing_date,
priority = line.priority)
get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit() # Release locks before starting a potentially long calculation
return 0 return 0
...@@ -139,8 +127,8 @@ class SQLQueue(RAMQueue): ...@@ -139,8 +127,8 @@ class SQLQueue(RAMQueue):
# An exception happens at somewhere else but invoke, so messages # An exception happens at somewhere else but invoke, so messages
# themselves should not be delayed. # themselves should not be delayed.
try: try:
activity_tool.SQLQueue_setPriority(uid = line.uid, date = line.date, activity_tool.SQLQueue_setPriority(uid=line.uid, date=line.date,
priority = line.priority) priority=line.priority)
get_transaction().commit() get_transaction().commit()
except: except:
LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised, and cannot even set processing to zero due to an exception', LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised, and cannot even set processing to zero due to an exception',
...@@ -162,22 +150,23 @@ class SQLQueue(RAMQueue): ...@@ -162,22 +150,23 @@ class SQLQueue(RAMQueue):
if type(m.exc_type) is ClassType \ if type(m.exc_type) is ClassType \
and issubclass(m.exc_type, ConflictError): and issubclass(m.exc_type, ConflictError):
activity_tool.SQLQueue_setPriority(uid = line.uid, activity_tool.SQLQueue_setPriority(uid=line.uid,
date = next_processing_date, date=next_processing_date,
priority = line.priority) priority=line.priority)
elif line.priority > MAX_PRIORITY: elif line.priority > MAX_PRIORITY:
# This is an error # This is an error
activity_tool.SQLQueue_assignMessage(uid = line.uid, activity_tool.SQLQueue_assignMessage(uid=line.uid,
processing_node = INVOKE_ERROR_STATE) processing_node=INVOKE_ERROR_STATE)
# Assign message back to 'error' state # Assign message back to 'error' state
m.notifyUser(activity_tool) # Notify Error m.notifyUser(activity_tool) # Notify Error
else: else:
# Lower priority # Lower priority
activity_tool.SQLQueue_setPriority(uid=line.uid, date = next_processing_date, activity_tool.SQLQueue_setPriority(uid=line.uid, date=next_processing_date,
priority = line.priority + 1) priority=line.priority + 1)
get_transaction().commit() get_transaction().commit()
except: except:
LOG('SQLQueue', ERROR, 'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages', LOG('SQLQueue', ERROR,
'SQLQueue.dequeueMessage raised an exception during checking for the results of processed messages',
error=sys.exc_info()) error=sys.exc_info())
raise raise
return 0 return 0
...@@ -215,32 +204,45 @@ class SQLQueue(RAMQueue): ...@@ -215,32 +204,45 @@ class SQLQueue(RAMQueue):
# Parse each message in registered # Parse each message in registered
for m in activity_tool.getRegisteredMessageList(self): for m in activity_tool.getRegisteredMessageList(self):
if object_path == m.object_path and (method_id is None or method_id == m.method_id): if object_path == m.object_path and (method_id is None or method_id == m.method_id):
if invoke: activity_tool.invoke(m) if invoke:
# First Validate
validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked
# The message no longer exists
raise ActivityFlushError, (
'Could not evaluate %s on %s' % (m.method_id , path))
elif validate_value is INVALID_PATH:
# The message no longer exists
raise ActivityFlushError, (
'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
activity_tool.unregisterMessage(self, m) activity_tool.unregisterMessage(self, m)
# Parse each message in SQL queue # Parse each message in SQL queue
#LOG('Flush', 0, str((path, invoke, method_id))) result = readMessageList(path=path, method_id=method_id, processing_node=None)
result = readMessageList(path=path, method_id=method_id,processing_node=None)
#LOG('Flush', 0, str(len(result)))
method_dict = {}
for line in result: for line in result:
path = line.path path = line.path
method_id = line.method_id method_id = line.method_id
if not method_dict.has_key(method_id):
# Only invoke once (it would be different for a queue)
method_dict[method_id] = 1
m = self.loadMessage(line.message, uid = line.uid) m = self.loadMessage(line.message, uid = line.uid)
if invoke: if invoke:
# First Validate # First Validate
if m.validate(self, activity_tool) is VALID: validate_value = m.validate(self, activity_tool)
if validate_value is VALID:
activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ?? activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
if not m.is_executed: # Make sure message could be invoked if not m.is_executed: # Make sure message could be invoked
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'Could not evaluate %s on %s' % (method_id , path)) 'Could not evaluate %s on %s' % (method_id , path))
else: elif validate_value is INVALID_PATH:
# The message no longer exists # The message no longer exists
raise ActivityFlushError, ( raise ActivityFlushError, (
'The document %s does not exist' % path) 'The document %s does not exist' % path)
else:
raise ActivityFlushError, (
'Could not validate %s on %s' % (m.method_id , path))
if len(result): if len(result):
activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result]) activity_tool.SQLQueue_delMessage(uid = [line.uid for line in result])
...@@ -265,31 +267,27 @@ class SQLQueue(RAMQueue): ...@@ -265,31 +267,27 @@ class SQLQueue(RAMQueue):
message_list.append(m) message_list.append(m)
return message_list return message_list
def countMessage(self, activity_tool, tag=None,path=None, def countMessage(self, activity_tool, tag=None, path=None,
method_id=None,message_uid=None,**kw): method_id=None, message_uid=None, **kw):
""" """Return the number of messages which match the given parameters.
Return the number of message which match the given parameter.
""" """
if isinstance(tag, StringType): if isinstance(tag, str):
tag = [tag] tag = [tag]
if isinstance(path, StringType): if isinstance(path, str):
path = [path] path = [path]
if isinstance(message_uid, (int,long)): if isinstance(method_id, str):
message_uid = [message_uid]
if isinstance(method_id, StringType):
method_id = [method_id] method_id = [method_id]
result = activity_tool.SQLQueue_validateMessageList(method_id=method_id, result = activity_tool.SQLQueue_validateMessageList(method_id=method_id,
path=path, path=path,
message_uid=message_uid, message_uid=message_uid,
tag=tag) tag=tag,
count=1)
return result[0].uid_count return result[0].uid_count
def countMessageWithTag(self, activity_tool, value): def countMessageWithTag(self, activity_tool, value):
"""Return the number of messages which match the given tag.
""" """
Return the number of message which match the given tag. return self.countMessage(activity_tool, tag=value)
"""
return self.countMessage(activity_tool,tag=value)
def dumpMessageList(self, activity_tool): def dumpMessageList(self, activity_tool):
# Dump all messages in the table. # Dump all messages in the table.
...@@ -306,105 +304,115 @@ class SQLQueue(RAMQueue): ...@@ -306,105 +304,115 @@ class SQLQueue(RAMQueue):
processing_node = 1 processing_node = 1
readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None) readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
if readMessageList is not None: if readMessageList is not None:
result = readMessageList(path=None, method_id=None, processing_node = -1) # Only assign non assigned messages now_date = DateTime()
#LOG('distribute count',0,str(len(result)) ) result = readMessageList(path=None, method_id=None,
#LOG('distribute count',0,str(map(lambda x:x.uid, result))) processing_node=-1, to_date=now_date)
#get_transaction().commit() # Release locks before starting a potentially long calculation get_transaction().commit()
result = list(result)[0:100]
validation_text_dict = {'none': 1}
message_dict = {}
for line in result: for line in result:
broadcast = line.broadcast message = self.loadMessage(line.message, uid=line.uid)
uid = line.uid message.order_validation_text = self.getOrderValidationText(message)
self.getExecutableMessageList(activity_tool, message, message_dict,
validation_text_dict)
# XXX probably this below can be optimized by assigning multiple messages at a time.
path_dict = {}
assignMessage = activity_tool.SQLQueue_assignMessage
processing_node = 1
for message in message_dict.itervalues():
path = '/'.join(message.object_path)
broadcast = message.activity_kw.get('broadcast', 0)
if broadcast: if broadcast:
# Broadcast messages must be distributed into all nodes. # Broadcast messages must be distributed into all nodes.
activity_tool.SQLQueue_assignMessage(processing_node=1, uid=uid) assignMessage(processing_node=1, uid=message.uid)
if node_count > 1: if node_count > 1:
uid_list = activity_tool.getPortalObject().portal_ids.generateNewLengthIdList(id_group='portal_activity_queue', id_count=node_count - 1) id_tool = activity_tool.getPortalObject().portal_ids
for node in range(2, node_count+1): uid_list = id_tool.generateNewLengthIdList(id_group='portal_activity_queue',
activity_tool.SQLQueue_writeMessage(uid = uid_list.pop(), id_count=node_count - 1)
path = line.path, priority = message.activity_kw.get('priority', 1)
method_id = line.method_id, dumped_message = self.dumpMessage(message)
priority = line.priority, date = message.activity_kw.get('at_date', now_date)
broadcast = 1, tag = message.activity_kw.get('tag', '')
processing_node = node, for node in xrange(2, node_count+1):
message = line.message, activity_tool.SQLQueue_writeMessage(uid=uid_list.pop(),
date = line.date) path=path,
method_id=message.method_id,
priority=priority,
broadcast=1,
processing_node=node,
message=dumped_message,
date=date,
tag=tag)
get_transaction().commit()
else: else:
#LOG("distribute", 0, "assign %s" % uid) # Select a processing node. If the same path appears again, dispatch the message to
activity_tool.SQLQueue_assignMessage(uid=uid, processing_node=processing_node) # the same node, so that object caching is more efficient. Otherwise, apply a round
#get_transaction().commit() # Release locks immediately to allow processing of messages # robin scheduling.
processing_node = processing_node + 1 node = path_dict.get(path)
if node is None:
node = processing_node
path_dict[path] = node
processing_node += 1
if processing_node > node_count: if processing_node > node_count:
processing_node = 1 # Round robin processing_node = 1
assignMessage(processing_node=node, uid=message.uid, broadcast=0)
get_transaction().commit() # Release locks immediately to allow processing of messages
# Validation private methods # Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None):
if isinstance(method_id, str):
method_id = [method_id]
if isinstance(path, str):
path = [path]
if isinstance(tag, str):
tag = [tag]
if method_id or message_uid or path or tag:
validateMessageList = activity_tool.SQLQueue_validateMessageList
result = validateMessageList(method_id=method_id,
message_uid=message_uid,
path=path,
tag=tag)
message_list = []
for line in result:
m = self.loadMessage(line.message,
uid=line.uid,
date=line.date,
processing_node=line.processing_node)
m.order_validation_text = self.getOrderValidationText(m)
message_list.append(m)
return message_list
else:
return []
def _validate_after_method_id(self, activity_tool, message, value): def _validate_after_method_id(self, activity_tool, message, value):
# Count number of occurances of method_id return self._validate(activity_tool, method_id=value)
#get_transaction().commit()
if type(value) == type(''):
value = [value]
result = activity_tool.SQLQueue_validateMessageList(method_id=value, message_uid=None, path=None)
#LOG('SQLQueue._validate_after_method_id, method_id',0,value)
#LOG('SQLQueue._validate_after_method_id, result[0].uid_count',0,result[0].uid_count)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_path(self, activity_tool, message, value): def _validate_after_path(self, activity_tool, message, value):
# Count number of occurances of path return self._validate(activity_tool, path=value)
if type(value) == type(''):
value = [value]
result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, path=value)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_message_uid(self, activity_tool, message, value): def _validate_after_message_uid(self, activity_tool, message, value):
# Count number of occurances of message_uid return self._validate(activity_tool, message_uid=value)
result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=value, path=None)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_path_and_method_id(self, activity_tool, message, value): def _validate_after_path_and_method_id(self, activity_tool, message, value):
# Count number of occurances of method_id and path if not isinstance(value, (tuple, list)) or len(value) < 2:
if (type(value) != type( (0,) ) and type(value) != type ([])) or len(value)<2: LOG('CMFActivity', WARNING,
LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_path_and_method : %s' % repr(value)) 'unable to recognize value for after_path_and_method: %r' % (value,))
return VALID return []
path = value[0] return self._validate(activity_tool, path=value[0], method_id=value[1])
method = value[1]
if type(path) == type(''):
path = [path]
if type(method) == type(''):
method = [method]
result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, path=path)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_tag(self, activity_tool, message, value): def _validate_after_tag(self, activity_tool, message, value):
# Count number of occurances of tag return self._validate(activity_tool, tag=value)
if type(value) == type(''):
value = [value]
result = activity_tool.SQLQueue_validateMessageList(method_id=None, message_uid=None, tag=value)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
def _validate_after_tag_and_method_id(self, activity_tool, message, value): def _validate_after_tag_and_method_id(self, activity_tool, message, value):
# Count number of occurances of tag and method_id if not isinstance(value, (tuple, list)) or len(value) < 2:
if (type(value) != type ( (0,) ) and type(value) != type([])) or len(value)<2: LOG('CMFActivity', WARNING,
LOG('CMFActivity WARNING :', 0, 'unable to recognize value for after_tag_and_method_id : %s' % repr(value)) 'unable to recognize value for after_tag_and_method_id: %r' % (value,))
return VALID return []
tag = value[0] return self._validate(activity_tool, tag=value[0], method_id=value[1])
method = value[1]
if type(tag) == type(''):
tag = [tag]
if type(method) == type(''):
method = [method]
result = activity_tool.SQLQueue_validateMessageList(method_id=method, message_uid=None, tag=tag)
if result[0].uid_count > 0:
return INVALID_ORDER
return VALID
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node = None): def timeShift(self, activity_tool, delay, processing_node = None):
...@@ -412,6 +420,6 @@ class SQLQueue(RAMQueue): ...@@ -412,6 +420,6 @@ class SQLQueue(RAMQueue):
To simulate timeShift, we simply substract delay from To simulate timeShift, we simply substract delay from
all dates in SQLDict message table all dates in SQLDict message table
""" """
activity_tool.SQLQueue_timeShift(delay = delay, processing_node = processing_node) activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
registerActivity(SQLQueue) registerActivity(SQLQueue)
...@@ -89,11 +89,11 @@ class Message: ...@@ -89,11 +89,11 @@ class Message:
Message instances are stored in an activity queue, inside the Activity Tool. Message instances are stored in an activity queue, inside the Activity Tool.
""" """
def __init__(self, object, active_process, activity_kw, method_id, args, kw): def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
if type(object) is StringType: if isinstance(obj, str):
self.object_path = object.split('/') self.object_path = obj.split('/')
else: else:
self.object_path = object.getPhysicalPath() self.object_path = obj.getPhysicalPath()
if type(active_process) is StringType: if type(active_process) is StringType:
self.active_process = active_process.split('/') self.active_process = active_process.split('/')
elif active_process is None: elif active_process is None:
...@@ -198,8 +198,13 @@ class Message: ...@@ -198,8 +198,13 @@ class Message:
if hasattr(activity_tool, 'error_log'): if hasattr(activity_tool, 'error_log'):
activity_tool.error_log.raising(sys.exc_info()) activity_tool.error_log.raising(sys.exc_info())
def validate(self, activity, activity_tool): def validate(self, activity, activity_tool, check_order_validation=1):
return activity.validate(activity_tool, self, **self.activity_kw) return activity.validate(activity_tool, self,
check_order_validation=check_order_validation,
**self.activity_kw)
def getDependentMessageList(self, activity, activity_tool):
return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
def notifyUser(self, activity_tool, message="Failed Processing Activity"): def notifyUser(self, activity_tool, message="Failed Processing Activity"):
"""Notify the user that the activity failed.""" """Notify the user that the activity failed."""
...@@ -827,7 +832,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -827,7 +832,7 @@ class ActivityTool (Folder, UniqueObject):
'could not dump messages from %s' % 'could not dump messages from %s' %
(activity,), error=sys.exc_info()) (activity,), error=sys.exc_info())
if hasattr(folder, 'SQLDict_createMessageTable'): if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
try: try:
folder.SQLDict_dropMessageTable() folder.SQLDict_dropMessageTable()
except ConflictError: except ConflictError:
...@@ -838,7 +843,7 @@ class ActivityTool (Folder, UniqueObject): ...@@ -838,7 +843,7 @@ class ActivityTool (Folder, UniqueObject):
error=sys.exc_info()) error=sys.exc_info())
folder.SQLDict_createMessageTable() folder.SQLDict_createMessageTable()
if hasattr(folder, 'SQLQueue_createMessageTable'): if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
try: try:
folder.SQLQueue_dropMessageTable() folder.SQLQueue_dropMessageTable()
except ConflictError: except ConflictError:
...@@ -920,16 +925,24 @@ class ActivityTool (Folder, UniqueObject): ...@@ -920,16 +925,24 @@ class ActivityTool (Folder, UniqueObject):
self.immediateReindexObject() self.immediateReindexObject()
# Active synchronisation methods # Active synchronisation methods
security.declarePrivate('validateOrder')
def validateOrder(self, message, validator_id, validation_value): def validateOrder(self, message, validator_id, validation_value):
message_list = self.getDependentMessageList(message, validator_id, validation_value)
return len(message_list) > 0
security.declarePrivate('getDependentMessageList')
def getDependentMessageList(self, message, validator_id, validation_value):
global is_initialized global is_initialized
if not is_initialized: self.initialize() if not is_initialized: self.initialize()
message_list = []
for activity in activity_list: for activity in activity_list:
method_id = "_validate_%s" % validator_id method_id = "_validate_%s" % validator_id
if hasattr(activity, method_id): method = getattr(activity, method_id, None)
if getattr(activity,method_id)(aq_inner(self), if method is not None:
message, validation_value): result = method(aq_inner(self), message, validation_value)
return 1 if result:
return 0 message_list.extend([(activity, m) for m in result])
return message_list
# Required for tests (time shift) # Required for tests (time shift)
def timeShift(self, delay): def timeShift(self, delay):
......
...@@ -2,12 +2,12 @@ ...@@ -2,12 +2,12 @@
title: title:
connection_id:cmf_activity_sql_connection connection_id:cmf_activity_sql_connection
max_rows:1000 max_rows:1000
max_cache:100 max_cache:0
cache_time:0 cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>processing_node:int=-1</params> <params>processing_node</params>
UPDATE UPDATE
message message
SET SET
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
title: title:
connection_id:cmf_activity_sql_connection connection_id:cmf_activity_sql_connection
max_rows:1000 max_rows:1000
max_cache:100 max_cache:0
cache_time:0 cache_time:0
class_name: class_name:
class_file: class_file:
......
...@@ -13,5 +13,6 @@ SET ...@@ -13,5 +13,6 @@ SET
processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">, processing_date = <dtml-sqlvar "_.DateTime()" type="datetime">,
processing = 1 processing = 1
WHERE WHERE
<dtml-in uid>uid = <dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else> uid IN (
OR </dtml-if></dtml-in> <dtml-in uid><dtml-sqlvar sequence-item type="int"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
...@@ -11,19 +11,9 @@ class_file: ...@@ -11,19 +11,9 @@ class_file:
method_id method_id
processing_node processing_node
priority priority
to_processing_date include_processing
include_processing</params> to_date</params>
<dtml-if to_processing_date>UPDATE message SELECT * FROM
SET
processing = 0
WHERE
processing = 1
AND
processing_date < <dtml-sqlvar to_processing_date type="datetime">
<dtml-var "'\0'">
</dtml-if>SELECT * FROM
message message
WHERE WHERE
1 = 1 1 = 1
...@@ -34,5 +24,6 @@ WHERE ...@@ -34,5 +24,6 @@ WHERE
<dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if priority> AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if> <dtml-if method_id> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY ORDER BY
priority, date, uid priority, date, uid
<dtml-comment> <dtml-comment>
title: title:
connection_id:cmf_activity_sql_connection connection_id:cmf_activity_sql_connection
max_rows:10000 max_rows:1000
max_cache:100 max_cache:0
cache_time:0 cache_time:0
class_name: class_name:
class_file: class_file:
......
...@@ -11,32 +11,31 @@ class_file: ...@@ -11,32 +11,31 @@ class_file:
message_uid message_uid
path path
tag tag
count
</params> </params>
SELECT SELECT
COUNT(DISTINCT uid) as uid_count <dtml-if count>
COUNT(*) AS uid_count
<dtml-else>
*
</dtml-if>
FROM FROM
message message
WHERE WHERE
processing_node >= -2 processing_node >= -2
<dtml-if method_id> <dtml-if method_id>
AND ( AND method_id IN (
<dtml-in method_id> <dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
) )
</dtml-if> </dtml-if>
<dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if> <dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if path> <dtml-if path>
AND ( AND path IN (
<dtml-in path> <dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
path = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
) )
</dtml-if> </dtml-if>
<dtml-if tag> <dtml-if tag>
AND ( AND tag IN (
<dtml-in tag> <dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
tag = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
) )
</dtml-if> </dtml-if>
...@@ -11,7 +11,7 @@ class_file: ...@@ -11,7 +11,7 @@ class_file:
processing_node processing_node
method_id method_id
broadcast broadcast
uid:int=0</params> uid</params>
UPDATE message_queue UPDATE message_queue
SET SET
processing_node=<dtml-sqlvar processing_node type="int">, processing_node=<dtml-sqlvar processing_node type="int">,
......
...@@ -7,7 +7,7 @@ cache_time:0 ...@@ -7,7 +7,7 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>processing_node:int=-1</params> <params>processing_node</params>
UPDATE UPDATE
message_queue message_queue
SET SET
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
title: title:
connection_id:cmf_activity_sql_connection connection_id:cmf_activity_sql_connection
max_rows:1000 max_rows:1000
max_cache:100 max_cache:0
cache_time:0 cache_time:0
class_name: class_name:
class_file: class_file:
......
...@@ -19,4 +19,4 @@ WHERE ...@@ -19,4 +19,4 @@ WHERE
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY ORDER BY
priority, date priority, date, uid
...@@ -10,7 +10,8 @@ class_file: ...@@ -10,7 +10,8 @@ class_file:
<params>path <params>path
method_id method_id
processing_node processing_node
priority</params> priority
to_date</params>
SELECT * FROM SELECT * FROM
message_queue message_queue
WHERE WHERE
...@@ -19,3 +20,6 @@ WHERE ...@@ -19,3 +20,6 @@ WHERE
<dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if> <dtml-if priority>AND priority = <dtml-sqlvar priority type="int"> </dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if>
<dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if method_id>AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
<dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
ORDER BY
priority, date, uid
<dtml-comment> <dtml-comment>
title: title:
connection_id:cmf_activity_sql_connection connection_id:cmf_activity_sql_connection
max_rows:10000 max_rows:1000
max_cache:100 max_cache:0
cache_time:0 cache_time:0
class_name: class_name:
class_file: class_file:
...@@ -16,5 +16,5 @@ SELECT uid FROM ...@@ -16,5 +16,5 @@ SELECT uid FROM
WHERE WHERE
processing <> 1 processing <> 1
<dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if> <dtml-if processing_node> AND processing_node = <dtml-sqlvar processing_node type="int"></dtml-if>
<dtml-if path>AND path = <dtml-sqlvar path type="string"></dtml-if> <dtml-if path> AND path = <dtml-sqlvar path type="string"></dtml-if>
<dtml-if to_date>AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if> <dtml-if to_date> AND date <= <dtml-sqlvar to_date type="datetime"> </dtml-if>
...@@ -11,32 +11,31 @@ class_file: ...@@ -11,32 +11,31 @@ class_file:
message_uid message_uid
path path
tag tag
count
</params> </params>
SELECT SELECT
COUNT(DISTINCT uid) as uid_count <dtml-if count>
COUNT(*) AS uid_count
<dtml-else>
*
</dtml-if>
FROM FROM
message_queue message_queue
WHERE WHERE
processing_node >= -2 processing_node >= -2
<dtml-if method_id> <dtml-if method_id>
AND ( AND method_id IN (
<dtml-in method_id> <dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
method_id = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
) )
</dtml-if> </dtml-if>
<dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if> <dtml-if message_uid>AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if path> <dtml-if path>
AND ( AND path IN (
<dtml-in path> <dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
path = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
) )
</dtml-if> </dtml-if>
<dtml-if tag> <dtml-if tag>
AND ( AND tag IN (
<dtml-in tag> <dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
tag = <dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else> OR </dtml-if>
</dtml-in>
) )
</dtml-if> </dtml-if>
...@@ -13,7 +13,7 @@ method_id ...@@ -13,7 +13,7 @@ method_id
message message
priority priority
broadcast broadcast
processing_node=-1 processing_node
date date
tag</params> tag</params>
INSERT INTO message_queue INSERT INTO message_queue
...@@ -22,7 +22,9 @@ SET ...@@ -22,7 +22,9 @@ SET
path = <dtml-sqlvar path type="string">, path = <dtml-sqlvar path type="string">,
<dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if> <dtml-if date>date = <dtml-sqlvar date type="datetime">, <dtml-else>date = <dtml-sqlvar "_.DateTime()" type="datetime">, </dtml-if>
method_id = <dtml-sqlvar method_id type="string">, method_id = <dtml-sqlvar method_id type="string">,
<dtml-if processing_node>
processing_node = <dtml-sqlvar processing_node type="int">, processing_node = <dtml-sqlvar processing_node type="int">,
</dtml-if>
broadcast = <dtml-sqlvar broadcast type="int">, broadcast = <dtml-sqlvar broadcast type="int">,
processing = -1, processing = -1,
priority = <dtml-sqlvar priority type="int">, priority = <dtml-sqlvar priority type="int">,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment