Commit 73d89df3 authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: optimization, cleanup, limit insertion by size in bytes instead of number of rows

parents 7fb53e8f 3ca5bf97
......@@ -33,14 +33,6 @@ from zLOG import LOG, WARNING, ERROR
from ZODB.POSException import ConflictError
from cStringIO import StringIO
import transaction
# Error values for message validation
EXCEPTION = -1
VALID = 0
INVALID_PATH = 1
INVALID_ORDER = 2
# Time global parameters
MAX_PROCESSING_TIME = 900 # in seconds
VALIDATION_ERROR_DELAY = 15 # in seconds
......@@ -96,52 +88,6 @@ class Queue(object):
def distribute(self, activity_tool, node_count):
raise NotImplementedError
def validate(self, activity_tool, message, check_order_validation=1, **kw):
"""
This is the place where activity semantics is implemented
**kw contains all parameters which allow to implement synchronisation,
constraints, delays, etc.
Standard synchronisation parameters:
after_method_id -- never validate message if after_method_id
is in the list of methods which are
going to be executed
after_message_uid -- never validate message if after_message_uid
is in the list of messages which are
going to be executed
after_path -- never validate message if after_path
is in the list of path which are
going to be executed
"""
try:
if activity_tool.unrestrictedTraverse(message.object_path, None) is None:
# Do not try to call methods on objects which do not exist
LOG('CMFActivity', WARNING,
'Object %s does not exist' % '/'.join(message.object_path))
return INVALID_PATH
if check_order_validation:
for k, v in kw.iteritems():
if activity_tool.validateOrder(message, k, v):
return INVALID_ORDER
except ConflictError:
raise
except:
LOG('CMFActivity', WARNING,
'Validation of Object %s raised exception' % '/'.join(message.object_path),
error=sys.exc_info())
# Do not try to call methods on objects which cause errors
return EXCEPTION
return VALID
def getDependentMessageList(self, activity_tool, message):
message_list = []
for k, v in message.activity_kw.iteritems():
message_list += activity_tool.getDependentMessageList(message, k, v)
return message_list
def getExecutableMessageList(self, activity_tool, message, message_dict,
validation_text_dict, now_date=None):
"""Get messages which have no dependent message, and store them in the dictionary.
......@@ -165,8 +111,7 @@ class Queue(object):
cached_result = validation_text_dict.get(message.order_validation_text)
if cached_result is None:
message_list = self.getDependentMessageList(activity_tool, message)
transaction.commit() # Release locks.
message_list = activity_tool.getDependentMessageList(message, self)
if message_list:
# The result is not empty, so this message is not executable.
validation_text_dict[message.order_validation_text] = 0
......@@ -189,9 +134,6 @@ class Queue(object):
elif cached_result:
message_dict[message.uid] = message
def hasActivity(self, activity_tool, object, processing_node=None, active_process=None, **kw):
return 0
def flush(self, activity_tool, object, **kw):
pass
......@@ -201,7 +143,7 @@ class Queue(object):
key_list = message.activity_kw.keys()
key_list.sort()
for key in key_list:
method_id = "_validate_%s" % key
method_id = "_validate_" + key
if getattr(self, method_id, None) is not None:
order_validation_item_list.append((key, message.activity_kw[key]))
if len(order_validation_item_list) == 0:
......@@ -216,14 +158,6 @@ class Queue(object):
def getMessageList(self, activity_tool, processing_node=None,**kw):
return []
def countMessage(self, activity_tool,**kw):
return 0
def countMessageWithTag(self, activity_tool,value):
"""Return the number of messages which match the given tag.
"""
return self.countMessage(activity_tool, tag=value)
# Transaction Management
def prepareQueueMessageList(self, activity_tool, message_list):
# Called to prepare transaction commit for queued messages
......
This diff is collapsed.
......@@ -26,6 +26,7 @@
#
##############################################################################
from Shared.DC.ZRDB.Results import Results
from Products.CMFActivity.ActivityTool import Message
import sys
#from time import time
......@@ -74,8 +75,9 @@ class SQLDict(SQLBase):
message_list = activity_buffer.getMessageList(self)
return [m for m in message_list if m.is_registered]
def getProcessableMessageLoader(self, activity_tool, processing_node):
def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {}
quote = db.string_literal
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path and
......@@ -87,6 +89,8 @@ class SQLDict(SQLBase):
uid = line.uid
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
sql_method_id = " AND method_id = %s AND group_method_id = %s" % (
quote(method_id), quote(line.group_method_id))
m = Message.load(line.message, uid=uid, line=line)
merge_parent = m.activity_kw.get('merge_parent')
try:
......@@ -101,11 +105,14 @@ class SQLDict(SQLBase):
path_list.append(path)
uid_list = []
if path_list:
result = activity_tool.SQLDict_selectParentMessage(
path=path_list,
method_id=method_id,
group_method_id=line.group_method_id,
processing_node=processing_node)
# Select parent messages.
result = Results(db.query("SELECT * FROM message"
" WHERE processing_node IN (0, %s) AND path IN (%s)%s"
" ORDER BY path LIMIT 1 FOR UPDATE" % (
processing_node,
','.join(map(quote, path_list)),
sql_method_id,
), 0))
if result: # found a parent
# mark child as duplicate
uid_list.append(uid)
......@@ -115,29 +122,32 @@ class SQLDict(SQLBase):
uid = line.uid
m = Message.load(line.message, uid=uid, line=line)
# return unreserved similar children
result = activity_tool.SQLDict_selectChildMessageList(
path=line.path,
method_id=method_id,
group_method_id=line.group_method_id)
reserve_uid_list = [x.uid for x in result]
path = line.path
result = db.query("SELECT uid FROM message"
" WHERE processing_node = 0 AND (path = %s OR path LIKE %s)"
"%s FOR UPDATE" % (
quote(path), quote(path.replace('_', r'\_') + '/%'),
sql_method_id,
), 0)[1]
reserve_uid_list = [x for x, in result]
uid_list += reserve_uid_list
if not line.processing_node:
# reserve found parent
reserve_uid_list.append(uid)
else:
result = activity_tool.SQLDict_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id)
reserve_uid_list = uid_list = [x.uid for x in result]
# Select duplicates.
result = db.query("SELECT uid FROM message"
" WHERE processing_node = 0 AND path = %s%s FOR UPDATE" % (
quote(path), sql_method_id,
), 0)[1]
reserve_uid_list = uid_list = [x for x, in result]
if reserve_uid_list:
activity_tool.SQLDict_reserveDuplicatedLineList(
processing_node=processing_node, uid=reserve_uid_list)
self.assignMessageList(db, processing_node, reserve_uid_list)
else:
activity_tool.SQLDict_commit() # release locks
db.query("COMMIT") # XXX: useful ?
except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception')
activity_tool.SQLDict_rollback() # release locks
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
......@@ -31,7 +31,7 @@ from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
from SQLBase import (
SQLBase, sort_message_key, MAX_MESSAGE_LIST_SIZE,
SQLBase, sort_message_key,
UID_SAFE_BITSIZE, UID_ALLOCATION_TRY_COUNT,
)
from Products.CMFActivity.ActivityTool import Message
......@@ -45,77 +45,103 @@ class SQLJoblib(SQLDict):
sql_table = 'message_job'
uid_group = 'portal_activity_job'
def initialize(self, activity_tool, clear):
"""
Initialize the message table using MYISAM Engine
"""
folder = activity_tool.getPortalObject().portal_skins.activity
try:
createMessageTable = folder.SQLJoblib_createMessageTable
except AttributeError:
return
if clear:
folder.SQLBase_dropMessageTable(table=self.sql_table)
createMessageTable()
else:
src = createMessageTable._upgradeSchema(create_if_not_exists=1,
initialize=self._initialize,
table=self.sql_table)
if src:
LOG('CMFActivity', INFO, "%r table upgraded\n%s"
% (self.sql_table, src))
def createTableSQL(self):
return """\
CREATE TABLE %s (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BINARY(16) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
_insert_template = ("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node,"
" priority, group_method_id, tag, signature, serialization_tag,"
" message) VALUES\n(%s)")
def prepareQueueMessageList(self, activity_tool, message_list):
registered_message_list = [m for m in message_list if m.is_registered]
portal = activity_tool.getPortalObject()
for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
message_list = registered_message_list[i:i+MAX_MESSAGE_LIST_SIZE]
path_list = ['/'.join(m.object_path) for m in message_list]
active_process_uid_list = [m.active_process_uid for m in message_list]
method_id_list = [m.method_id for m in message_list]
priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
date_list = [m.activity_kw.get('at_date') for m in message_list]
group_method_id_list = [m.getGroupId() for m in message_list]
tag_list = [m.activity_kw.get('tag', '') for m in message_list]
signature_list=[m.activity_kw.get('signature', '') for m in message_list]
serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
for m in message_list]
processing_node_list = []
for m in message_list:
m.order_validation_text = x = self.getOrderValidationText(m)
processing_node_list.append(0 if x == 'none' else -1)
db = activity_tool.getSQLConnection()
quote = db.string_literal
def insert(reset_uid):
values = self._insert_separator.join(values_list)
del values_list[:]
for _ in xrange(UID_ALLOCATION_TRY_COUNT):
if reset_uid:
reset_uid = False
# Overflow will result into IntegrityError.
db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
try:
portal.SQLJoblib_writeMessage(
uid_list=[
getrandbits(UID_SAFE_BITSIZE)
for _ in xrange(len(message_list))
],
path_list=path_list,
active_process_uid_list=active_process_uid_list,
method_id_list=method_id_list,
priority_list=priority_list,
message_list=map(Message.dump, message_list),
group_method_id_list=group_method_id_list,
date_list=date_list,
tag_list=tag_list,
processing_node_list=processing_node_list,
signature_list=signature_list,
serialization_tag_list=serialization_tag_list)
db.query(self._insert_template % (self.sql_table, values))
except MySQLdb.IntegrityError, (code, _):
if code != DUP_ENTRY:
raise
reset_uid = True
else:
break
else:
raise ValueError("Maximum retry for SQLBase_writeMessageList reached")
raise ValueError("Maximum retry for prepareQueueMessageList reached")
i = 0
reset_uid = True
values_list = []
max_payload = self._insert_max_payload
sep_len = len(self._insert_separator)
for m in message_list:
if m.is_registered:
active_process_uid = m.active_process_uid
order_validation_text = m.order_validation_text = \
self.getOrderValidationText(m)
date = m.activity_kw.get('at_date')
row = ','.join((
'@uid+%s' % i,
quote('/'.join(m.object_path)),
'NULL' if active_process_uid is None else str(active_process_uid),
"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
quote(m.method_id),
'0' if order_validation_text == 'none' else '-1',
str(m.activity_kw.get('priority', 1)),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('signature', '')),
quote(m.activity_kw.get('serialization_tag', '')),
quote(Message.dump(m))))
i += 1
n = sep_len + len(row)
max_payload -= n
if max_payload < 0:
if values_list:
insert(reset_uid)
reset_uid = False
max_payload = self._insert_max_payload - n
else:
raise ValueError("max_allowed_packet too small to insert message")
values_list.append(row)
if values_list:
insert(reset_uid)
def getProcessableMessageLoader(self, activity_tool, processing_node):
def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {}
quote = db.string_literal
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path, method_id
......@@ -128,19 +154,21 @@ class SQLJoblib(SQLDict):
if original_uid is None:
m = Message.load(line.message, uid=uid, line=line)
try:
result = activity_tool.SQLJoblib_selectDuplicatedLineList(
path=path,
method_id=method_id,
group_method_id=line.group_method_id,
signature=line.signature)
reserve_uid_list = uid_list = [x.uid for x in result]
if reserve_uid_list:
activity_tool.SQLBase_reserveMessageList(
table=self.sql_table,
processing_node=processing_node,
uid=reserve_uid_list)
# Select duplicates.
result = db.query("SELECT uid FROM message_job"
" WHERE processing_node = 0 AND path = %s AND signature = %s"
" AND method_id = %s AND group_method_id = %s FOR UPDATE" % (
quote(path), quote(line.signature),
quote(method_id), quote(line.group_method_id),
), 0)[1]
uid_list = [x for x, in result]
if uid_list:
self.assignMessageList(db, processing_node, uid_list)
else:
db.query("COMMIT") # XXX: useful ?
except:
self._log(WARNING, 'getDuplicateMessageUidList got an exception')
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
......@@ -57,6 +57,7 @@ from Products.ERP5Type.UnrestrictedMethod import PrivilegedUser
from zope.site.hooks import setSite
import transaction
from App.config import getConfiguration
from Shared.DC.ZRDB.Results import Results
import Products.Localizer.patches
localizer_lock = Products.Localizer.patches._requests_lock
......@@ -191,7 +192,6 @@ class Message(BaseMessage):
call_traceback = None
exc_info = None
is_executed = MESSAGE_NOT_EXECUTED
processing = None
traceback = None
oid = None
is_registered = False
......@@ -367,11 +367,6 @@ class Message(BaseMessage):
except:
self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
def validate(self, activity, activity_tool, check_order_validation=1):
return activity.validate(activity_tool, self,
check_order_validation=check_order_validation,
**self.activity_kw)
def notifyUser(self, activity_tool, retry=False):
"""Notify the user that the activity failed."""
portal = activity_tool.getPortalObject()
......@@ -655,11 +650,6 @@ class ActivityTool (BaseTool):
activity_timing_log = False
cancel_and_invoke_links_hidden = False
def SQLDict_setPriority(self, **kw):
real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
return real_SQLDict_setPriority(**kw)
# Filter content (ZMI))
def filtered_meta_types(self, user=None):
# Filters the list of available meta types.
......@@ -670,6 +660,9 @@ class ActivityTool (BaseTool):
meta_types.append(meta_type)
return meta_types
def getSQLConnection(self):
return self.aq_inner.aq_parent.cmf_activity_sql_connection()
def maybeMigrateConnectionClass(self):
connection_id = 'cmf_activity_sql_connection'
sql_connection = getattr(self, connection_id, None)
......@@ -689,6 +682,20 @@ class ActivityTool (BaseTool):
self.maybeMigrateConnectionClass()
for activity in activity_dict.itervalues():
activity.initialize(self, clear=False)
# Remove old skin if any.
skins_tool = self.getPortalObject().portal_skins
name = 'activity'
if (getattr(skins_tool.get(name), '_dirpath', None)
== 'Products.CMFActivity:skins/activity'):
for selection, skins in skins_tool.getSkinPaths():
skins = skins.split(',')
try:
skins.remove(name)
except ValueError:
continue
skins_tool.manage_skinLayers(
add_skin=1, skinname=selection, skinpath=skins)
skins_tool._delObject(name)
def _callSafeFunction(self, batch_function):
return batch_function()
......@@ -1127,14 +1134,16 @@ class ActivityTool (BaseTool):
def hasActivity(self, *args, **kw):
# Check in each queue if the object has deferred tasks
# if not argument is provided, then check on self
if len(args) > 0:
obj = args[0]
if args:
obj, = args
else:
obj = self
for activity in activity_dict.itervalues():
if activity.hasActivity(aq_inner(self), obj, **kw):
return True
return False
path = None if obj is None else '/'.join(obj.getPhysicalPath())
db = self.getSQLConnection()
quote = db.string_literal
return bool(db.query("(%s)" % ") UNION ALL (".join(
activity.hasActivitySQL(quote, path=path, **kw)
for activity in activity_dict.itervalues()))[1])
security.declarePrivate('getActivityBuffer')
def getActivityBuffer(self, create_if_not_found=True):
......@@ -1443,8 +1452,9 @@ class ActivityTool (BaseTool):
"""
if not(isinstance(message_uid_list, list)):
message_uid_list = [message_uid_list]
self.SQLBase_makeMessageListAvailable(table=activity_dict[activity].sql_table,
uid=message_uid_list)
if message_uid_list:
activity_dict[activity].unreserveMessageList(self.getSQLConnection(),
0, message_uid_list)
if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (
self.absolute_url(), 'view'))
......@@ -1470,8 +1480,8 @@ class ActivityTool (BaseTool):
"""
if not(isinstance(message_uid_list, list)):
message_uid_list = [message_uid_list]
self.SQLBase_delMessage(table=activity_dict[activity].sql_table,
uid=message_uid_list)
activity_dict[activity].deleteMessageList(
self.getSQLConnection(), message_uid_list)
if REQUEST is not None:
return REQUEST.RESPONSE.redirect('%s/%s' % (
self.absolute_url(), 'view'))
......@@ -1523,10 +1533,7 @@ class ActivityTool (BaseTool):
"""
Return the number of messages which match the given tag.
"""
message_count = 0
for activity in activity_dict.itervalues():
message_count += activity.countMessageWithTag(aq_inner(self), value)
return message_count
return self.countMessage(tag=value)
security.declarePublic('countMessage')
def countMessage(self, **kw):
......@@ -1540,10 +1547,11 @@ class ActivityTool (BaseTool):
tag : activities with a particular tag
message_uid : activities with a particular uid
"""
message_count = 0
for activity in activity_dict.itervalues():
message_count += activity.countMessage(aq_inner(self), **kw)
return message_count
db = self.getSQLConnection()
quote = db.string_literal
return sum(x for x, in db.query("(%s)" % ") UNION ALL (".join(
activity.countMessageSQL(quote, **kw)
for activity in activity_dict.itervalues()))[1])
security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
def newActiveProcess(self, REQUEST=None, **kw):
......@@ -1554,23 +1562,31 @@ class ActivityTool (BaseTool):
REQUEST['RESPONSE'].redirect( 'manage_main' )
return obj
# Active synchronisation methods
security.declarePrivate('validateOrder')
def validateOrder(self, message, validator_id, validation_value):
message_list = self.getDependentMessageList(message, validator_id, validation_value)
return len(message_list) > 0
security.declarePrivate('getDependentMessageList')
def getDependentMessageList(self, message, validator_id, validation_value):
message_list = []
method_id = "_validate_" + validator_id
def getDependentMessageList(self, message, validating_queue=None):
activity_kw = message.activity_kw
db = self.getSQLConnection()
quote = db.string_literal
queries = []
for activity in activity_dict.itervalues():
method = getattr(activity, method_id, None)
if method is not None:
result = method(aq_inner(self), message, validation_value)
if result:
message_list += [(activity, m) for m in result]
return message_list
q = activity.getValidationSQL(
quote, activity_kw, activity is validating_queue)
if q:
queries.append(q)
if queries:
message_list = []
for line in Results(db.query("(%s)" % ") UNION ALL (".join(queries))):
activity = activity_dict[line.activity]
m = Message.load(line.message,
line=line,
uid=line.uid,
date=line.date,
processing_node=line.processing_node)
if not hasattr(m, 'order_validation_text'): # BBB
m.order_validation_text = activity.getOrderValidationText(m)
message_list.append((activity, m))
return message_list
return ()
# Required for tests (time shift)
def timeShift(self, delay):
......
#!/bin/sh
set -e
# Small watching script based on Sébastien idea.
# ideas:
# - more control on what would be displayed
......@@ -32,13 +31,47 @@ INTERVAL=$2
exit 1
}
SELECT=""
for t in message message_queue ; do
SELECT=$SELECT"""
SELECT count(*) AS $t, ${text_group:-method_id}, processing, processing_node AS node, min(priority) AS min_pri, max(priority) AS max_pri FROM $t GROUP BY ${text_group:-method_id}, processing, processing_node ORDER BY node;
SELECT count(*) AS $t, processing, processing_node, min(priority) AS min_pri, max(priority) AS max_pri FROM $t GROUP BY processing, processing_node;
SELECT priority as pri, MIN(timediff(NOW(), date)) AS min, AVG(timediff(NOW() , date)) AS avg, MAX(timediff(NOW() , date)) AS max FROM $t GROUP BY priority;
SELECT count(*) AS ${t}_count FROM $t;
"""
node_priority_cols="processing_node AS node, MIN(priority) AS min_pri, MAX(priority) AS max_pri"
for t in message:dict message_queue:queue message_job:joblib; do
table=${t%:*}
t=${t#*:}
create=$create"
CREATE TEMPORARY TABLE _$t(
n INT UNSIGNED NOT NULL,
${text_group:-method_id} VARCHAR(255) NOT NULL,
processing_node SMALLINT NOT NULL,
priority TINYINT NOT NULL,
min_date DATETIME(6) NOT NULL,
max_date DATETIME(6) NOT NULL,
max_retry TINYINT UNSIGNED NOT NULL
) ENGINE=MEMORY;"
collect=$collect"
INSERT INTO _$t SELECT count(*), ${text_group:-method_id},
processing_node, priority, MIN(date), MAX(date), MAX(retry) FROM $table
GROUP BY processing_node, priority, ${text_group:-method_id};"
select=$select"
SELECT IFNULL(SUM(n),0) AS $t, ${text_group:-method_id},
$node_priority_cols, MAX(max_retry) AS max_retry FROM _$t
GROUP BY processing_node, ${text_group:-method_id}
ORDER BY processing_node, ${text_group:-method_id};
SELECT priority,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), MAX(max_date)), \"%T\") AS min,
TIME_FORMAT(TIMEDIFF(UTC_TIMESTAMP(6), MIN(min_date)), \"%T\") AS max
FROM _$t GROUP BY priority ORDER BY priority;"
[ "$count" ] && {
not_processing=$not_processing" UNION ALL "
count=$count,
}
not_processing=$not_processing"
SELECT IFNULL(SUM(n),0) AS count, $node_priority_cols,
MIN(min_date) AS min_date, MAX(max_date) AS max_date
FROM _$t WHERE processing_node<=0 GROUP BY processing_node"
count=$count"(SELECT IFNULL(SUM(n),0) AS $t FROM _$t) as $t"
total=$total+$t
done
exec watch -n ${INTERVAL:-5} "${MYSQL:-mysql} $MYSQL_OPT --disable-pager -t -e '$SELECT' "
exec watch -n ${INTERVAL:-5} "${MYSQL:-mysql} $MYSQL_OPT --disable-pager -t -e '
SET autocommit=off;$create$collect
SELECT *, $total as total FROM $count;$select
SELECT SUM(count) as count, node, MIN(min_pri) AS min_pri, MAX(max_pri) AS max_pri,
MIN(min_date) AS min_date, MAX(max_date) AS max_date
FROM ($not_processing) as t GROUP BY node;'"
......@@ -50,7 +50,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
<th align="left" valign="top">Named Parameters</th>
<th align="left" valign="top">Processing Node</th>
<th align="left" valign="top">Retry</th>
<th align="left" valign="top">Processing</th>
<th align="left" valign="top">Call Traceback</th>
</tr>
<dtml-in expr="getMessageList()">
......@@ -84,11 +83,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
</td>
<td align="left" valign="top"><dtml-var processing_node></td>
<td align="left" valign="top"><dtml-var retry></td>
<td align="left" valign="top">
<dtml-if expr="processing is not None">
<dtml-var processing>
</dtml-if>