SQLDict.py 18.6 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
30
from Queue import VALID, INVALID_PATH
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from RAMDict import RAMDict
32
from Products.CMFActivity.Errors import ActivityFlushError
33
import sys
34
#from time import time
35
from SQLBase import SQLBase, sort_message_key
Jean-Paul Smets's avatar
Jean-Paul Smets committed
36

37
import transaction
38

39
from zLOG import LOG, TRACE, WARNING, ERROR, INFO, PANIC
Jean-Paul Smets's avatar
Jean-Paul Smets committed
40

41 42
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000 
Yoshinori Okuji's avatar
Yoshinori Okuji committed
43
# Read up to this number of messages to validate.
44
READ_MESSAGE_LIMIT = 1000
Jean-Paul Smets's avatar
Jean-Paul Smets committed
45

46 47
MAX_MESSAGE_LIST_SIZE = 100

48
class SQLDict(RAMDict, SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49 50 51 52 53
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
54
  sql_table = 'message'
55
  merge_duplicate = True
56

Jean-Paul Smets's avatar
Jean-Paul Smets committed
57
  # Transaction commit methods
58
  def prepareQueueMessageList(self, activity_tool, message_list):
59 60 61
    message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(message_list), MAX_MESSAGE_LIST_SIZE):
      registered_message_list = message_list[i:i + MAX_MESSAGE_LIST_SIZE]
62
      #LOG('SQLDict prepareQueueMessageList', 0, 'registered_message_list = %r' % (registered_message_list,))
63
      path_list = ['/'.join(message.object_path) for message in registered_message_list]
64
      active_process_uid_list = [message.active_process_uid for message in registered_message_list]
65 66 67
      method_id_list = [message.method_id for message in registered_message_list]
      priority_list = [message.activity_kw.get('priority', 1) for message in registered_message_list]
      dumped_message_list = [self.dumpMessage(message) for message in registered_message_list]
68
      date_list = [message.activity_kw.get('at_date', None) for message in registered_message_list]
69 70
      group_method_id_list = ['\0'.join([message.activity_kw.get('group_method_id', ''), message.activity_kw.get('group_id', '')])
                              for message in registered_message_list]
71
      tag_list = [message.activity_kw.get('tag', '') for message in registered_message_list]
72
      serialization_tag_list = [message.activity_kw.get('serialization_tag', '') for message in registered_message_list]
73
      order_validation_text_list = [self.getOrderValidationText(message) for message in registered_message_list]
74 75 76 77
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.\
                                           generateNewIdList(id_generator='uid', id_group='portal_activity',
                                           id_count=len(registered_message_list))
78 79
      activity_tool.SQLDict_writeMessageList( uid_list = uid_list,
                                              path_list = path_list,
80
                                              active_process_uid_list=active_process_uid_list,
81 82 83
                                              method_id_list = method_id_list,
                                              priority_list = priority_list,
                                              message_list = dumped_message_list,
84
                                              date_list = date_list,
85
                                              group_method_id_list = group_method_id_list,
86
                                              tag_list = tag_list,
87
                                              serialization_tag_list = serialization_tag_list,
88
                                              processing_node_list=None,
89
                                              order_validation_text_list = order_validation_text_list)
90

91 92
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
93
    path = '/'.join(m.object_path)
94 95
    order_validation_text = self.getOrderValidationText(m)
    uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = m.method_id,
96
                                                 order_validation_text = order_validation_text)
97
    uid_list = [x.uid for x in uid_list]
98
    if len(uid_list)>0:
99
      activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
100

101 102 103 104 105 106 107 108
  def finishQueueMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

  def finishDeleteMessage(self, activity_tool_path, m):
    # Nothing to do in SQLDict.
    pass

109
  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
110
  def registerActivityBuffer(self, activity_buffer):
111
    pass
112

113 114
  def generateMessageUID(self, m):
    return (tuple(m.object_path), m.method_id, m.activity_kw.get('tag'), m.activity_kw.get('group_id'))
115

Jean-Paul Smets's avatar
Jean-Paul Smets committed
116 117
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
118
    class_name = self.__class__.__name__
119
    uid_set = activity_buffer.getUidSet(self)
120
    uid_set.discard(self.generateMessageUID(m))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
121 122

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
123 124
    message_list = activity_buffer.getMessageList(self)
    return [m for m in message_list if m.is_registered]
125

126
  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
127
    """
128 129
      Reserve unreserved messages matching given line.
      Return their uids.
130
    """
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
    try:
      result = activity_tool.SQLDict_selectDuplicatedLineList(
        path=line.path,
        method_id=line.method_id,
        group_method_id=line.group_method_id,
        order_validation_text=line.order_validation_text
      )
      uid_list = [x.uid for x in result]
      if len(uid_list):
        activity_tool.SQLDict_reserveDuplicatedLineList(
          processing_node=processing_node,
          uid_list=uid_list
        )
      else:
        # Release locks
        activity_tool.SQLDict_commit()
    except:
      # Log
      LOG('SQLDict', WARNING, 'getDuplicateMessageUidList got an exception', error=sys.exc_info())
      # Release lock
      activity_tool.SQLDict_rollback()
      # And re-raise
      raise
    return uid_list
155

156
  dequeueMessage = SQLBase.dequeueMessage
Jean-Paul Smets's avatar
Jean-Paul Smets committed
157

158
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
159 160
    hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
    if hasMessage is not None:
161 162 163 164
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
165 166 167
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
168 169
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
170
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
171 172
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
173 174 175 176 177 178

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
179 180

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
181 182
    """
    path = '/'.join(object_path)
183
    # LOG('Flush', 0, str((path, invoke, method_id)))
184
    method_dict = {}
185 186
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
187 188
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
189
        if m.object_path == object_path and (method_id is None or method_id == m.method_id):
190
          #if not method_dict.has_key(method_id or m.method_id):
191 192
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
193
            if invoke:
194 195
              # First Validate
              validate_value = m.validate(self, activity_tool)
196
              if validate_value is VALID:
197
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
198
                if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
199 200
                  # The message no longer exists
                  raise ActivityFlushError, (
201
                      'Could not evaluate %s on %s' % (m.method_id , path))
202
              elif validate_value is INVALID_PATH:
203 204
                # The message no longer exists
                raise ActivityFlushError, (
205
                    'The document %s does not exist' % path)
206 207 208 209
              else:
                raise ActivityFlushError, (
                    'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
210
      # Parse each message in SQL dict
211
      result = readMessageList(path=path, method_id=method_id,
212
                               processing_node=None,include_processing=0, to_date=None)
213 214
      for line in result:
        path = line.path
215 216
        line_method_id = line.method_id
        if not method_dict.has_key(line_method_id):
217
          # Only invoke once (it would be different for a queue)
218 219
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
220
          method_dict[line_method_id] = 1
221
          m = self.loadMessage(line.message, uid=line.uid, line=line)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
222
          if invoke:
223 224 225 226 227
            # First Validate (only if message is marked as new)
            if line.processing_node == -1:
              validate_value = m.validate(self, activity_tool)
            else:
              validate_value = VALID
Romain Courteaud's avatar
Romain Courteaud committed
228
#             LOG('SQLDict.flush validate_value',0,validate_value)
229
            if validate_value is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
230
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
231
              if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
Jean-Paul Smets's avatar
Jean-Paul Smets committed
232 233
                # The message no longer exists
                raise ActivityFlushError, (
234
                    'Could not evaluate %s on %s' % (m.method_id , path))
235
            elif validate_value is INVALID_PATH:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
236 237
              # The message no longer exists
              raise ActivityFlushError, (
238
                  'The document %s does not exist' % path)
239 240 241
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
242 243

      if len(result):
244
        uid_list = activity_tool.SQLDict_readUidList(path = path, method_id = method_id,
245
                                                     order_validation_text=None)
246
        if len(uid_list)>0:
247 248
          activity_tool.SQLBase_delMessage(table=self.sql_table,
                                           uid=[x.uid for x in uid_list])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
249

250
  getMessageList = SQLBase.getMessageList
251

252 253 254
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
255 256 257
    dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
258
      for line in result:
259
        m = self.loadMessage(line.message, uid=line.uid, line=line)
260 261 262
        message_list.append(m)
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
263
  def distribute(self, activity_tool, node_count):
264
    offset = 0
265 266
    readMessageList = getattr(activity_tool, 'SQLDict_readMessageList', None)
    if readMessageList is not None:
267 268
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
269 270 271 272 273 274
      while 1:
        result = readMessageList(path=None, method_id=None, processing_node=-1,
                                 to_date=now_date, include_processing=0,
                                 offset=offset, count=READ_MESSAGE_LIMIT)
        if not result:
          return
275
        transaction.commit()
276 277 278 279

        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
280 281
          message = self.loadMessage(line.message, uid=line.uid, line=line,
            order_validation_text=line.order_validation_text)
282 283
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
284 285

        if message_dict:
286 287 288
          message_unique_dict = {}
          serialization_tag_dict = {}
          distributable_uid_set = set()
289
          deletable_uid_list = []
290

291 292 293
          # remove duplicates
          # SQLDict considers object_path, method_id, tag to unify activities,
          # but ignores method arguments. They are outside of semantics.
294 295 296 297 298 299 300 301 302 303
          for message in message_dict.itervalues():
            message_unique_dict.setdefault(self.generateMessageUID(message),
                                           []).append(message)
          for message_list in message_unique_dict.itervalues():
            if len(message_list) > 1:
              # Sort list of duplicates to keep the message with highest score
              message_list.sort(key=sort_message_key)
              deletable_uid_list += [m.uid for m in message_list[1:]]
            message = message_list[0]
            serialization_tag = message.activity_kw.get('serialization_tag')
304 305 306
            if serialization_tag is None:
              distributable_uid_set.add(message.uid)
            else:
307 308
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
309 310
          # Don't let through if there is the same serialization tag in the
          # message dict. If there is the same serialization tag, only one can
311
          # be validated and others must wait.
312 313
          # But messages with group_method_id are exceptions. serialization_tag
          # does not stop validating together. Because those messages should
314
          # be processed together at once.
315 316 317
          for message_list in serialization_tag_dict.itervalues():
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
318
            distributable_uid_set.add(message_list[0].uid)
319
            group_method_id = message_list[0].activity_kw.get('group_method_id')
320 321
            if group_method_id is None:
              continue
322
            for message in message_list[1:]:
323 324
              if group_method_id == message.activity_kw.get('group_method_id'):
                distributable_uid_set.add(message.uid)
325
          if deletable_uid_list:
326 327
            activity_tool.SQLBase_delMessage(table=self.sql_table,
                                             uid=deletable_uid_list)
328 329 330 331 332
          distributable_count = len(distributable_uid_set)
          if distributable_count:
            activity_tool.SQLBase_assignMessage(table=self.sql_table,
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
Julien Muchembled's avatar
Julien Muchembled committed
333 334 335
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
        offset += READ_MESSAGE_LIMIT
Jean-Paul Smets's avatar
Jean-Paul Smets committed
336

337
  # Validation private methods
338 339
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
340 341 342 343 344 345 346
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

347
    if method_id or message_uid or path or tag or serialization_tag:
348 349 350 351
      validateMessageList = activity_tool.SQLDict_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
352
                                   tag=tag,
353
                                   count=False,
354
                                   serialization_tag=serialization_tag)
355 356 357
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
358
                             line=line,
359 360
                             uid=line.uid,
                             date=line.date,
361 362
                             processing_node=line.processing_node,
                             order_validation_text=line.order_validation_text)
363 364 365 366 367 368 369 370
        message_list.append(m)
      return message_list
    else:
      return []

  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
371
    """
372
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
373
      tag = [tag]
374
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
375
      path = [path]
Jérome Perrin's avatar
bug.  
Jérome Perrin committed
376
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
377 378 379 380
      method_id = [method_id]
    result = activity_tool.SQLDict_validateMessageList(method_id=method_id, 
                                                       path=path,
                                                       message_uid=message_uid, 
381
                                                       tag=tag,
382
                                                       serialization_tag=None,
383
                                                       count=1)
Sebastien Robin's avatar
Sebastien Robin committed
384 385
    return result[0].uid_count

386
  def countMessageWithTag(self, activity_tool, value):
387
    """Return the number of messages which match the given tag.
388
    """
389
    return self.countMessage(activity_tool, tag=value)
390

391
  # Required for tests (time shift)
392
  def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
393 394 395 396
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
397
    activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
398

399 400 401 402 403
  def getPriority(self, activity_tool):
    method = activity_tool.SQLDict_getPriority
    default =  RAMDict.getPriority(self, activity_tool)
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
404
registerActivity(SQLDict)