SQLQueue.py 13.3 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import registerActivity, MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED
30
from Queue import VALID, INVALID_PATH
31
from Products.CMFActivity.Errors import ActivityFlushError
Romain Courteaud's avatar
Romain Courteaud committed
32
from ZODB.POSException import ConflictError
33
from SQLBase import SQLBase, sort_message_key
34
from zExceptions import ExceptionFormatter
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35

36
import transaction
37

38
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000

45 46
MAX_MESSAGE_LIST_SIZE = 100

47
class SQLQueue(SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
49 50 51
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
52
  """
53
  sql_table = 'message_queue'
54
  merge_duplicate = False
55 56

  def prepareQueueMessageList(self, activity_tool, message_list):
57 58 59
    message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(message_list), MAX_MESSAGE_LIST_SIZE):
      registered_message_list = message_list[i:i + MAX_MESSAGE_LIST_SIZE]
60 61 62 63
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
                id_generator='uid', id_group='portal_activity_queue',
                id_count=len(registered_message_list))
64
      path_list = ['/'.join(m.object_path) for m in registered_message_list]
65
      active_process_uid_list = [m.active_process_uid for m in registered_message_list]
66 67 68
      method_id_list = [m.method_id for m in registered_message_list]
      priority_list = [m.activity_kw.get('priority', 1) for m in registered_message_list]
      date_list = [m.activity_kw.get('at_date', None) for m in registered_message_list]
69
      group_method_id_list = [m.getGroupId() for m in registered_message_list]
70 71
      tag_list = [m.activity_kw.get('tag', '') for m in registered_message_list]
      serialization_tag_list = [m.activity_kw.get('serialization_tag', '') for m in registered_message_list]
72
      dumped_message_list = [self.dumpMessage(m) for m in registered_message_list]
73 74
      activity_tool.SQLQueue_writeMessageList(uid_list=uid_list,
                                              path_list=path_list,
75
                                              active_process_uid_list=active_process_uid_list,
76 77
                                              method_id_list=method_id_list,
                                              priority_list=priority_list,
78
                                              message_list=dumped_message_list,
79
                                              group_method_id_list = group_method_id_list,
80 81
                                              date_list=date_list,
                                              tag_list=tag_list,
82
                                              processing_node_list=None,
83
                                              serialization_tag_list=serialization_tag_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
84

85
  def getDuplicateMessageUidList(self, activity_tool, line, processing_node):
86
    """
87 88
      Reserve unreserved messages matching given line.
      Return their uids.
89
    """
90
    return ()
91

92
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
93 94
    hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
    if hasMessage is not None:
95 96 97 98
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
99 100 101
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
102
    return 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
103

104
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
105 106 107
    """
      object_path is a tuple
    """
108 109
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
110 111 112 113 114 115
      #return # Do nothing here to precent overlocking
      path = '/'.join(object_path)
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if object_path == m.object_path and (method_id is None or method_id == m.method_id):
          if invoke:
116 117
            # First Validate
            validate_value = m.validate(self, activity_tool)
118
            if validate_value is VALID:
119
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
120
              if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
121 122
                # The message no longer exists
                raise ActivityFlushError, (
123 124
                    'Could not evaluate %s on %s' % (m.method_id , path))
            elif validate_value is INVALID_PATH:
125 126
              # The message no longer exists
              raise ActivityFlushError, (
127
                  'The document %s does not exist' % path)
128 129 130 131 132
            else:
              raise ActivityFlushError, (
                  'Could not validate %s on %s' % (m.method_id , path))
          activity_tool.unregisterMessage(self, m)
      # Parse each message in SQL queue
133
      result = readMessageList(path=path, method_id=method_id, processing_node=None, to_date=None, include_processing=0)
134 135 136
      for line in result:
        path = line.path
        method_id = line.method_id
137
        m = self.loadMessage(line.message, uid=line.uid, line=line)
138
        if invoke:
139 140 141 142 143
          # First Validate (only if message is marked as new)
          if line.processing_node == -1:
            validate_value = m.validate(self, activity_tool)
          else:
            validate_value = VALID
144 145
          if validate_value is VALID:
            activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
146
            if m.getExecutionState() != MESSAGE_EXECUTED:                                                 # Make sure message could be invoked
147 148 149 150 151 152 153 154 155 156
              # The message no longer exists
              raise ActivityFlushError, (
                  'Could not evaluate %s on %s' % (method_id , path))
          elif validate_value is INVALID_PATH:
            # The message no longer exists
            raise ActivityFlushError, (
                'The document %s does not exist' % path)
          else:
            raise ActivityFlushError, (
                'Could not validate %s on %s' % (m.method_id , path))
157 158

      if len(result):
159 160
        activity_tool.SQLBase_delMessage(table=self.sql_table,
                                         uid=[line.uid for line in result])
Jean-Paul Smets's avatar
Jean-Paul Smets committed
161

162 163 164
  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
165
    """
166
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
167
      tag = [tag]
168
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
169
      path = [path]
170
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
171 172
      method_id = [method_id]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method_id, 
173 174 175
                                                        path=path,
                                                        message_uid=message_uid, 
                                                        tag=tag,
176
                                                        serialization_tag=None,
177
                                                        count=1)
Sebastien Robin's avatar
Sebastien Robin committed
178 179 180
    return result[0].uid_count

  def countMessageWithTag(self, activity_tool, value):
181
    """Return the number of messages which match the given tag.
Sebastien Robin's avatar
Sebastien Robin committed
182
    """
183
    return self.countMessage(activity_tool, tag=value)
Sebastien Robin's avatar
Sebastien Robin committed
184

185 186 187
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
188 189 190
    dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
191
      for line in result:
192
        m = self.loadMessage(line.message, uid=line.uid, line=line)
193 194
        message_list.append(m)
    return message_list
195

196
  def distribute(self, activity_tool, node_count):
197
    offset = 0
198 199
    readMessageList = getattr(activity_tool, 'SQLQueue_readMessageList', None)
    if readMessageList is not None:
200 201
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
202 203 204 205 206 207
      while 1:
        result = readMessageList(path=None, method_id=None, processing_node=-1,
                                 to_date=now_date, include_processing=0,
                                 offset=offset, count=READ_MESSAGE_LIMIT)
        if not result:
          return
208
        transaction.commit()
209

210 211 212
        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
Julien Muchembled's avatar
Julien Muchembled committed
213
          message = self.loadMessage(line.message, uid=line.uid, line=line)
214 215 216
          message.order_validation_text = self.getOrderValidationText(message)
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
217 218 219 220 221 222 223 224 225 226 227 228 229 230
        if message_dict:
          distributable_uid_set = set()
          serialization_tag_dict = {}
          for message in message_dict.itervalues():
            serialization_tag = message.activity_kw.get('serialization_tag')
            if serialization_tag is None:
              distributable_uid_set.add(message.uid)
            else:
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
          for message_list in serialization_tag_dict.itervalues():
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
            distributable_uid_set.add(message_list[0].uid)
231 232
            group_method_id = message_list[0].line.group_method_id
            if group_method_id == '\0':
233 234
              continue
            for message in message_list[1:]:
235
              if group_method_id == message.line.group_method_id:
236
                distributable_uid_set.add(message.uid)
237 238 239 240 241 242 243
          distributable_count = len(distributable_uid_set)
          if distributable_count:
            activity_tool.SQLBase_assignMessage(table=self.sql_table,
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
Julien Muchembled's avatar
Julien Muchembled committed
244
        offset += READ_MESSAGE_LIMIT
245

246
  # Validation private methods
247 248
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
249 250 251 252 253 254 255
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

256
    if method_id or message_uid or path or tag or serialization_tag:
257 258 259 260
      validateMessageList = activity_tool.SQLQueue_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
261
                                   tag=tag,
262
                                   count=False,
263
                                   serialization_tag=serialization_tag)
264 265 266
      message_list = []
      for line in result:
        m = self.loadMessage(line.message,
267
                             line=line,
268 269 270 271 272 273 274 275 276
                             uid=line.uid,
                             date=line.date,
                             processing_node=line.processing_node)
        m.order_validation_text = self.getOrderValidationText(m)
        message_list.append(m)
      return message_list
    else:
      return []

277
  # Required for tests (time shift)
278
  def timeShift(self, activity_tool, delay, processing_node = None):
279 280
    """
      To simulate timeShift, we simply substract delay from
Vincent Pelletier's avatar
Vincent Pelletier committed
281
      all dates in SQLQueue message table
282
    """
283
    activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
284

285 286
  def getPriority(self, activity_tool):
    method = activity_tool.SQLQueue_getPriority
287
    default =  SQLBase.getPriority(self, activity_tool)
288 289
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
290
registerActivity(SQLQueue)