1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import pickle
from Products.CMFActivity.ActivityTool import Message
from zLOG import LOG
class Queue:
"""
Step 1: use lists
Step 2: add some object related dict which prevents calling twice the same method
Step 3: add some time information for deferred execution
Step 4: use MySQL as a way to store events (with locks)
Step 5: use periodic Timer to wakeup Scheduler
Step 6: add multiple threads on a single Scheduler
Step 7: add control thread to kill "events which last too long"
Some data:
- reindexObject = 50 ms
- calling a MySQL read = 0.7 ms
- calling a simple method by HTTP = 30 ms
- calling a complex method by HTTP = 500 ms
References:
http://www.mysql.com/doc/en/InnoDB_locking_reads.html
http://www.python.org/doc/current/lib/thread-objects.html
http://www-poleia.lip6.fr/~briot/actalk/actalk.html
"""
#scriptable_method_id_list = ['appendMessage', 'nextMessage', 'delMessage']
def __init__(self):
self.is_alive = 1
self.is_awake = 0
self.is_initialized = 0
def initialize(self, activity_tool):
# This is the only moment when
# we can set some global variables related
# to the ZODB context
if not self.is_initialized:
self.activity_tool = activity_tool
self.is_initialized = 1
def queueMessage(self, activity_tool, m):
pass
def dequeueMessage(self, activity_tool):
pass
def tic(self, activity_tool):
# Tic should return quickly
if self.dequeueMessage(activity_tool):
self.sleep(activity_tool)
def sleep(self, activity_tool):
self.is_awake = 0
def wakeup(self, activity_tool):
self.is_awake = 1
def terminate(self, activity_tool):
self.is_awake = 0
self.is_alive = 0
def validate(self, activity_tool, message, wait_for=None, **kw):
try:
if activity_tool.unrestrictedTraverse(message.object_path) is None:
# Do not try to call methods on objects which do not exist
LOG('WARNING ActivityTool', 0,
'Object %s does not exist' % '/'.join(message.object_path))
return 0
except:
LOG('WARNING ActivityTool', 0,
'Object %s could not be accessed' % '/'.join(message.object_path))
# Do not try to call methods on objects which cause errors
return 0
if wait_for is not None:
if wait_for():
return 0
return 1
def isAwake(self, activity_tool):
return self.is_awake
def hasActivity(self, activity_tool, object, **kw):
return 0
def flush(self, activity_tool, object, **kw):
pass
def loadMessage(self, s):
return pickle.loads(s)
def dumpMessage(self, m):
return pickle.dumps(m)
def getMessageList(self, activity_tool):
return []