SQLQueue.py 9.93 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2
##############################################################################
#
3
# Copyright (c) 2002,2007 Nexedi SA and Contributors. All Rights Reserved.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
from Products.CMFActivity.ActivityTool import Message, registerActivity
Romain Courteaud's avatar
Romain Courteaud committed
30
from ZODB.POSException import ConflictError
31
from SQLBase import SQLBase, sort_message_key
32
from zExceptions import ExceptionFormatter
Jean-Paul Smets's avatar
Jean-Paul Smets committed
33

34
import transaction
35

36
from zLOG import LOG, WARNING, ERROR, INFO, PANIC, TRACE
Jean-Paul Smets's avatar
Jean-Paul Smets committed
37

38 39 40 41 42
# Stop validating more messages when this limit is reached
MAX_VALIDATED_LIMIT = 1000
# Read this many messages to validate.
READ_MESSAGE_LIMIT = 1000

43 44
MAX_MESSAGE_LIST_SIZE = 100

45
class SQLQueue(SQLBase):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
46
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
47 48 49
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
Jean-Paul Smets's avatar
Jean-Paul Smets committed
50
  """
51
  sql_table = 'message_queue'
52 53

  def prepareQueueMessageList(self, activity_tool, message_list):
54 55 56
    registered_message_list = [m for m in message_list if m.is_registered]
    for i in xrange(0, len(registered_message_list), MAX_MESSAGE_LIST_SIZE):
      message_list = registered_message_list[i:i + MAX_MESSAGE_LIST_SIZE]
57 58
      # The uid_list also is store in the ZODB
      uid_list = activity_tool.getPortalObject().portal_ids.generateNewIdList(
59 60 61 62 63 64 65 66 67 68 69
        id_generator='uid', id_group='portal_activity_queue',
        id_count=len(message_list))
      path_list = ['/'.join(m.object_path) for m in message_list]
      active_process_uid_list = [m.active_process_uid for m in message_list]
      method_id_list = [m.method_id for m in message_list]
      priority_list = [m.activity_kw.get('priority', 1) for m in message_list]
      date_list = [m.activity_kw.get('at_date') for m in message_list]
      group_method_id_list = [m.getGroupId() for m in message_list]
      tag_list = [m.activity_kw.get('tag', '') for m in message_list]
      serialization_tag_list = [m.activity_kw.get('serialization_tag', '')
                                for m in message_list]
70 71 72 73
      processing_node_list = []
      for m in message_list:
        m.order_validation_text = x = self.getOrderValidationText(m)
        processing_node_list.append(0 if x == 'none' else -1)
74
      dumped_message_list = map(Message.dump, message_list)
75 76 77 78 79 80 81 82 83 84
      activity_tool.SQLQueue_writeMessageList(
        uid_list=uid_list,
        path_list=path_list,
        active_process_uid_list=active_process_uid_list,
        method_id_list=method_id_list,
        priority_list=priority_list,
        message_list=dumped_message_list,
        group_method_id_list=group_method_id_list,
        date_list=date_list,
        tag_list=tag_list,
85
        processing_node_list=processing_node_list,
86
        serialization_tag_list=serialization_tag_list)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
87

88
  def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
89 90
    hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
    if hasMessage is not None:
91 92 93 94
      if object is None:
        my_object_path = None
      else:
        my_object_path = '/'.join(object.getPhysicalPath())
95 96 97
      result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
      if len(result) > 0:
        return result[0].message_count > 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
98
    return 0
Jean-Paul Smets's avatar
Jean-Paul Smets committed
99

100 101 102
  def countMessage(self, activity_tool, tag=None, path=None,
                   method_id=None, message_uid=None, **kw):
    """Return the number of messages which match the given parameters.
Sebastien Robin's avatar
Sebastien Robin committed
103
    """
104
    if isinstance(tag, str):
Sebastien Robin's avatar
Sebastien Robin committed
105
      tag = [tag]
106
    if isinstance(path, str):
Sebastien Robin's avatar
Sebastien Robin committed
107
      path = [path]
108
    if isinstance(method_id, str):
Sebastien Robin's avatar
Sebastien Robin committed
109 110
      method_id = [method_id]
    result = activity_tool.SQLQueue_validateMessageList(method_id=method_id, 
111 112 113
                                                        path=path,
                                                        message_uid=message_uid, 
                                                        tag=tag,
114
                                                        serialization_tag=None,
115
                                                        count=1)
Sebastien Robin's avatar
Sebastien Robin committed
116 117 118
    return result[0].uid_count

  def countMessageWithTag(self, activity_tool, value):
119
    """Return the number of messages which match the given tag.
Sebastien Robin's avatar
Sebastien Robin committed
120
    """
121
    return self.countMessage(activity_tool, tag=value)
Sebastien Robin's avatar
Sebastien Robin committed
122

123 124 125
  def dumpMessageList(self, activity_tool):
    # Dump all messages in the table.
    message_list = []
126 127 128
    dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
    if dumpMessageList is not None:
      result = dumpMessageList()
129
      for line in result:
130
        m = Message.load(line.message, uid=line.uid, line=line)
131 132
        message_list.append(m)
    return message_list
133

134
  def distribute(self, activity_tool, node_count):
135
    offset = 0
136 137
    assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
    if assignMessage is not None:
138 139
      now_date = self.getNow(activity_tool)
      validated_count = 0
Julien Muchembled's avatar
Julien Muchembled committed
140
      while 1:
141 142 143
        result = self._getMessageList(activity_tool, processing_node=-1,
                                      to_date=now_date, processing=0,
                                      offset=offset, count=READ_MESSAGE_LIMIT)
Julien Muchembled's avatar
Julien Muchembled committed
144 145
        if not result:
          return
146
        transaction.commit()
147

148 149 150
        validation_text_dict = {'none': 1}
        message_dict = {}
        for line in result:
151
          message = Message.load(line.message, uid=line.uid, line=line)
152 153
          if not hasattr(message, 'order_validation_text'): # BBB
            message.order_validation_text = self.getOrderValidationText(message)
154 155
          self.getExecutableMessageList(activity_tool, message, message_dict,
                                        validation_text_dict, now_date=now_date)
156 157 158 159 160 161 162 163 164 165 166 167 168 169
        if message_dict:
          distributable_uid_set = set()
          serialization_tag_dict = {}
          for message in message_dict.itervalues():
            serialization_tag = message.activity_kw.get('serialization_tag')
            if serialization_tag is None:
              distributable_uid_set.add(message.uid)
            else:
              serialization_tag_dict.setdefault(serialization_tag,
                                                []).append(message)
          for message_list in serialization_tag_dict.itervalues():
            # Sort list of messages to validate the message with highest score
            message_list.sort(key=sort_message_key)
            distributable_uid_set.add(message_list[0].uid)
170 171
            group_method_id = message_list[0].line.group_method_id
            if group_method_id == '\0':
172 173
              continue
            for message in message_list[1:]:
174
              if group_method_id == message.line.group_method_id:
175
                distributable_uid_set.add(message.uid)
176 177
          distributable_count = len(distributable_uid_set)
          if distributable_count:
178
            assignMessage(table=self.sql_table,
179 180 181 182
              processing_node=0, uid=tuple(distributable_uid_set))
            validated_count += distributable_count
            if validated_count >= MAX_VALIDATED_LIMIT:
              return
Julien Muchembled's avatar
Julien Muchembled committed
183
        offset += READ_MESSAGE_LIMIT
184

185
  # Validation private methods
186 187
  def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
                serialization_tag=None):
188 189 190 191 192 193 194
    if isinstance(method_id, str):
      method_id = [method_id]
    if isinstance(path, str):
      path = [path]
    if isinstance(tag, str):
      tag = [tag]

195
    if method_id or message_uid or path or tag or serialization_tag:
196 197 198 199
      validateMessageList = activity_tool.SQLQueue_validateMessageList
      result = validateMessageList(method_id=method_id,
                                   message_uid=message_uid,
                                   path=path,
200
                                   tag=tag,
201
                                   count=False,
202
                                   serialization_tag=serialization_tag)
203 204
      message_list = []
      for line in result:
205
        m = Message.load(line.message,
206
                             line=line,
207 208 209
                             uid=line.uid,
                             date=line.date,
                             processing_node=line.processing_node)
210 211
        if not hasattr(m, 'order_validation_text'): # BBB
          m.order_validation_text = self.getOrderValidationText(m)
212 213 214 215 216
        message_list.append(m)
      return message_list
    else:
      return []

217
  # Required for tests (time shift)
218
  def timeShift(self, activity_tool, delay, processing_node = None):
219 220
    """
      To simulate timeShift, we simply substract delay from
Vincent Pelletier's avatar
Vincent Pelletier committed
221
      all dates in SQLQueue message table
222
    """
223
    activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
224

225 226
  def getPriority(self, activity_tool):
    method = activity_tool.SQLQueue_getPriority
227
    default =  SQLBase.getPriority(self, activity_tool)
228 229
    return self._getPriority(activity_tool, method, default)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
230
registerActivity(SQLQueue)