SQLQueue.py 13 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
30
from Queue import VALID, INVALID_PATH
31
from Products.CMFActivity.Errors import ActivityFlushError
Romain Courteaud's avatar
Romain Courteaud committed
32
from ZODB.POSException import ConflictError
33
from SQLBase import SQLBase, sort_message_key
34
from zExceptions import ExceptionFormatter
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35

36
import transaction
37

38
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000

45 46
MAX_MESSAGE_LIST_SIZE = 100

47
class SQLQueue(SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49 50 51
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
52
  """
53
  sql_table = 'message_queue'
54
  merge_duplicate = False
55 56

  def prepareQueueMessageList(self, activity_tool, message_list):
57 58 59
    registered_message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
      message_list = registered_message_list[i:i + MAX_MESSAGE_LIST_SIZE]
60 61
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
62 63 64 65 66 67 68 69 70 71 72
        id_generator='uid', id_group='portal_activity_queue',
        id_count=len(message_list))
      path_list = ['/'.join(m.object_path) for m in message_list]
      active_process_uid_list = [m.active_process_uid for m in message_list]
      method_id_list = [m.method_id for m in message_list]
      priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
      date_list = [m.activity_kw.get('at_date') for m in message_list]
      group_method_id_list = [m.getGroupId() for m in message_list]
      tag_list = [m.activity_kw.get('tag', '') for m in message_list]
      serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
                                for m in message_list]
73 74 75 76
      processing_node_list = []
      for m in message_list:
        m.order_validation_text = x = self.getOrderValidationText(m)
        processing_node_list.append(0 if x == 'none' else -1)
77 78 79 80 81 82 83 84 85 86 87
      dumped_message_list = map(self.dumpMessage, message_list)
      activity_tool.SQLQueue_writeMessageList(
        uid_list=uid_list,
        path_list=path_list,
        active_process_uid_list=active_process_uid_list,
        method_id_list=method_id_list,
        priority_list=priority_list,
        message_list=dumped_message_list,
        group_method_id_list=group_method_id_list,
        date_list=date_list,
        tag_list=tag_list,
88
        processing_node_list=processing_node_list,
89
        serialization_tag_list=serialization_tag_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
90

91
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
92 93
    hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
    if hasMessage is not None:
94 95 96 97
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
98 99 100
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
101
    return 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
102

103
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
104 105 106
    """
      object_path is a tuple
    """
107 108
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
109 110 111 112 113 114
      #return # Do nothing here to precent overlocking
      path = '/'.join(object_path)
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if object_path == m.object_path and (method_id is None or method_id == m.method_id):
          if invoke:
115 116
            # First Validate
            validate_value = m.validate(self, activity_tool)
117
            if validate_value is VALID:
118
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
119
              if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
120 121
                # The message no longer exists
                raise ActivityFlushError, (
122 123
                    'Could not evaluate %s on %s' % (m.method_id , path))
            elif validate_value is INVALID_PATH:
124 125
              # The message no longer exists
              raise ActivityFlushError, (
126
                  'The document %s does not exist' % path)
127 128 129 130 131
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
      # Parse each message in SQL queue
132
      result = readMessageList(path=path, method_id=method_id, processing_node=None, to_date=None, include_processing=0)
133 134 135
      for line in result:
        path = line.path
        method_id = line.method_id
136
        m = self.loadMessage(line.message, uid=line.uid, line=line)
137
        if invoke:
138 139 140 141 142
          # First Validate (only if message is marked as new)
          if line.processing_node == -1:
            validate_value = m.validate(self, activity_tool)
          else:
            validate_value = VALID
143 144
          if validate_value is VALID:
            activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
145
            if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
146 147 148 149 150 151 152 153 154 155
              # The message no longer exists
              raise ActivityFlushError, (
                  'Could not evaluate %s on %s' % (method_id , path))
          elif validate_value is INVALID_PATH:
            # The message no longer exists
            raise ActivityFlushError, (
                'The document %s does not exist' % path)
          else:
            raise ActivityFlushError, (
                'Could not validate %s on %s' % (m.method_id , path))
156 157

      if len(result):
158 159
        activity_tool.SQLBase_delMessage(table=self.sql_table,
                                         uid=[line.uid for line in result])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
160

161 162 163
  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
164
    """
165
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
166
      tag = [tag]
167
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
168
      path = [path]
169
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
170 171
      method_id = [method_id]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method_id, 
172 173 174
                                                        path=path,
                                                        message_uid=message_uid, 
                                                        tag=tag,
175
                                                        serialization_tag=None,
176
                                                        count=1)
Sebastien Robin's avatar
Sebastien Robin committed
177 178 179
    return result[0].uid_count

  def countMessageWithTag(self, activity_tool, value):
180
    """Return the number of messages which match the given tag.
Sebastien Robin's avatar
Sebastien Robin committed
181
    """
182
    return self.countMessage(activity_tool, tag=value)
Sebastien Robin's avatar
Sebastien Robin committed
183

184 185 186
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
187 188 189
    dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
190
      for line in result:
191
        m = self.loadMessage(line.message, uid=line.uid, line=line)
192 193
        message_list.append(m)
    return message_list
194

195
  def distribute(self, activity_tool, node_count):
196
    offset = 0
197 198
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
199 200
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
201 202 203 204 205 206
      while 1:
        result = readMessageList(path=None, method_id=None, processing_node=-1,
                                 to_date=now_date, include_processing=0,
                                 offset=offset, count=READ_MESSAGE_LIMIT)
        if not result:
          return
207
        transaction.commit()
208

209 210 211
        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
Julien Muchembled's avatar
Julien Muchembled committed
212
          message = self.loadMessage(line.message, uid=line.uid, line=line)
213 214
          if not hasattr(message, 'order_validation_text'): # BBB
            message.order_validation_text = self.getOrderValidationText(message)
215 216
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
217 218 219 220 221 222 223 224 225 226 227 228 229 230
        if message_dict:
          distributable_uid_set = set()
          serialization_tag_dict = {}
          for message in message_dict.itervalues():
            serialization_tag = message.activity_kw.get('serialization_tag')
            if serialization_tag is None:
              distributable_uid_set.add(message.uid)
            else:
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
          for message_list in serialization_tag_dict.itervalues():
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
            distributable_uid_set.add(message_list[0].uid)
231 232
            group_method_id = message_list[0].line.group_method_id
            if group_method_id == '\0':
233 234
              continue
            for message in message_list[1:]:
235
              if group_method_id == message.line.group_method_id:
236
                distributable_uid_set.add(message.uid)
237 238 239 240 241 242 243
          distributable_count = len(distributable_uid_set)
          if distributable_count:
            activity_tool.SQLBase_assignMessage(table=self.sql_table,
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
Julien Muchembled's avatar
Julien Muchembled committed
244
        offset += READ_MESSAGE_LIMIT
245

246
  # Validation private methods
247 248
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
249 250 251 252 253 254 255
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

256
    if method_id or message_uid or path or tag or serialization_tag:
257 258 259 260
      validateMessageList = activity_tool.SQLQueue_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
261
                                   tag=tag,
262
                                   count=False,
263
                                   serialization_tag=serialization_tag)
264 265 266
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
267
                             line=line,
268 269 270
                             uid=line.uid,
                             date=line.date,
                             processing_node=line.processing_node)
271 272
        if not hasattr(m, 'order_validation_text'): # BBB
          m.order_validation_text = self.getOrderValidationText(m)
273 274 275 276 277
        message_list.append(m)
      return message_list
    else:
      return []

278
  # Required for tests (time shift)
279
  def timeShift(self, activity_tool, delay, processing_node = None):
280 281
    """
      To simulate timeShift, we simply substract delay from
Vincent Pelletier's avatar
Vincent Pelletier committed
282
      all dates in SQLQueue message table
283
    """
284
    activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
285

286 287
  def getPriority(self, activity_tool):
    method = activity_tool.SQLQueue_getPriority
288
    default =  SQLBase.getPriority(self, activity_tool)
289 290
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
291
registerActivity(SQLQueue)