1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from __future__ import absolute_import
##############################################################################
#
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
# Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from random import getrandbits
from zLOG import LOG, TRACE, INFO, WARNING, ERROR, PANIC
import MySQLdb
from MySQLdb.constants.ER import DUP_ENTRY
from .SQLBase import (
SQLBase, sort_message_key,
UID_SAFE_BITSIZE, UID_ALLOCATION_TRY_COUNT,
)
from Products.CMFActivity.ActivityTool import Message
from .SQLDict import SQLDict
from six.moves import xrange
class SQLJoblib(SQLDict):
"""
An extention of SQLDict, It is non transatactional and follow always-excute paradigm.
It uses a dictionary to store results and with hash of arguments as keys
"""
sql_table = 'message_job'
uid_group = 'portal_activity_job'
def createTableSQL(self):
return """\
CREATE TABLE %s (
`uid` BIGINT UNSIGNED NOT NULL,
`date` DATETIME(6) NOT NULL,
`path` VARCHAR(255) NOT NULL,
`active_process_uid` INT UNSIGNED NULL,
`method_id` VARCHAR(255) NOT NULL,
`processing_node` SMALLINT NOT NULL DEFAULT -1,
`priority` TINYINT NOT NULL DEFAULT 0,
`group_method_id` VARCHAR(255) NOT NULL DEFAULT '',
`tag` VARCHAR(255) NOT NULL,
`signature` BINARY(16) NOT NULL,
`serialization_tag` VARCHAR(255) NOT NULL,
`retry` TINYINT UNSIGNED NOT NULL DEFAULT 0,
`message` LONGBLOB NOT NULL,
PRIMARY KEY (`uid`),
KEY `processing_node_priority_date` (`processing_node`, `priority`, `date`),
KEY `node_group_priority_date` (`processing_node`, `group_method_id`, `priority`, `date`),
KEY `serialization_tag_processing_node` (`serialization_tag`, `processing_node`),
KEY (`path`),
KEY (`active_process_uid`),
KEY (`method_id`),
KEY (`tag`)
) ENGINE=InnoDB""" % self.sql_table
def generateMessageUID(self, m):
return (tuple(m.object_path), m.method_id, m.activity_kw.get('signature'),
m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
_insert_template = ("INSERT INTO %s (uid,"
" path, active_process_uid, date, method_id, processing_node,"
" priority, group_method_id, tag, signature, serialization_tag,"
" message) VALUES\n(%s)")
def prepareQueueMessageList(self, activity_tool, message_list):
db = activity_tool.getSQLConnection()
quote = db.string_literal
def insert(reset_uid):
values = self._insert_separator.join(values_list)
del values_list[:]
for _ in xrange(UID_ALLOCATION_TRY_COUNT):
if reset_uid:
reset_uid = False
# Overflow will result into IntegrityError.
db.query("SET @uid := %s" % getrandbits(UID_SAFE_BITSIZE))
try:
db.query(self._insert_template % (self.sql_table, values))
except MySQLdb.IntegrityError as e:
if e.args[0] != DUP_ENTRY:
raise
reset_uid = True
else:
break
else:
raise ValueError("Maximum retry for prepareQueueMessageList reached")
i = 0
reset_uid = True
values_list = []
max_payload = self._insert_max_payload
sep_len = len(self._insert_separator)
hasDependency = self._hasDependency
for m in message_list:
if m.is_registered:
active_process_uid = m.active_process_uid
date = m.activity_kw.get('at_date')
row = ','.join((
b'@uid+%s' % i,
quote('/'.join(m.object_path)),
b'NULL' if active_process_uid is None else str(active_process_uid),
b"UTC_TIMESTAMP(6)" if date is None else quote(render_datetime(date)),
quote(m.method_id),
b'-1' if hasDependency(m) else b'0',
bytes(m.activity_kw.get('priority', 1)),
quote(m.getGroupId()),
quote(m.activity_kw.get('tag', '')),
quote(m.activity_kw.get('signature', '')),
quote(m.activity_kw.get('serialization_tag', '')),
quote(Message.dump(m))))
i += 1
n = sep_len + len(row)
max_payload -= n
if max_payload < 0:
if values_list:
insert(reset_uid)
reset_uid = False
max_payload = self._insert_max_payload - n
else:
raise ValueError("max_allowed_packet too small to insert message")
values_list.append(row)
if values_list:
insert(reset_uid)
def getProcessableMessageLoader(self, db, processing_node):
path_and_method_id_dict = {}
quote = db.string_literal
def load(line):
# getProcessableMessageList already fetch messages with the same
# group_method_id, so what remains to be filtered on are path, method_id
# and signature
path = line.path
method_id = line.method_id
key = path, method_id
uid = line.uid
original_uid = path_and_method_id_dict.get(key)
if original_uid is None:
m = Message.load(line.message, uid=uid, line=line)
try:
# Select duplicates.
result = db.query(b"SELECT uid FROM message_job"
" WHERE processing_node = 0 AND path = %s AND signature = %s"
" AND method_id = %s AND group_method_id = %s FOR UPDATE" % (
quote(path), quote(line.signature),
quote(method_id), quote(line.group_method_id),
), 0)[1]
uid_list = [x for x, in result]
if uid_list:
self.assignMessageList(db, processing_node, uid_list)
else:
db.query("COMMIT") # XXX: useful ?
except:
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
path_and_method_id_dict[key] = uid
return m, uid, uid_list
# We know that original_uid != uid because caller skips lines we returned
# earlier.
return None, original_uid, [uid]
return load
def getPriority(self, activity_tool, processing_node, node_set):
return SQLDict.getPriority(self, activity_tool, processing_node)
def getReservedMessageList(self, db, date, processing_node,
limit=None, group_method_id=None, node_set=None):
return SQLDict.getReservedMessageList(self, db,
date, processing_node, limit, group_method_id)