SQLDict.py 17 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#                    Jean-Paul Smets-Solanes <jp@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

29
import random
30
from DateTime import DateTime
Jean-Paul Smets's avatar
Jean-Paul Smets committed
31
from Products.CMFActivity.ActivityTool import registerActivity
32
from Queue import VALID, INVALID_ORDER, INVALID_PATH, EXCEPTION, MAX_PROCESSING_TIME, VALIDATION_ERROR_DELAY, SECONDS_IN_DAY
Jean-Paul Smets's avatar
Jean-Paul Smets committed
33
from RAMDict import RAMDict
34
from Products.CMFActivity.ActiveObject import DISTRIBUTABLE_STATE, INVOKE_ERROR_STATE, VALIDATE_ERROR_STATE
Jean-Paul Smets's avatar
Jean-Paul Smets committed
35 36 37

from zLOG import LOG

38
MAX_PRIORITY = 5
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39

40 41 42 43 44 45
priority_weight = \
  [1] * 64 + \
  [2] * 20 + \
  [3] * 10 + \
  [4] * 5 + \
  [5] * 1
46

47 48 49
class ActivityFlushError(Exception):
    """Error during active message flush"""

Jean-Paul Smets's avatar
Jean-Paul Smets committed
50 51 52 53 54 55
class SQLDict(RAMDict):
  """
    A simple OOBTree based queue. It should be compatible with transactions
    and provide sequentiality. Should not create conflict
    because use of OOBTree.
  """
Jean-Paul Smets's avatar
Jean-Paul Smets committed
56
  # Transaction commit methods
57
  def prepareQueueMessage(self, activity_tool, m):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
58
    if m.is_registered:
59
      activity_tool.SQLDict_writeMessage( path = '/'.join(m.object_path) ,
Jean-Paul Smets's avatar
Jean-Paul Smets committed
60 61
                                          method_id = m.method_id,
                                          priority = m.activity_kw.get('priority', 1),
62
                                          broadcast = m.activity_kw.get('broadcast', 0),
63 64
                                          message = self.dumpMessage(m),
                                          date = m.activity_kw.get('at_date', DateTime()))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
65
                                          # Also store uid of activity
Jean-Paul Smets's avatar
Jean-Paul Smets committed
66

67 68
  def prepareDeleteMessage(self, activity_tool, m):
    # Erase all messages in a single transaction
Jean-Paul Smets's avatar
Jean-Paul Smets committed
69 70
    path = '/'.join(m.object_path)
    uid_list = activity_tool.SQLDict_readUidList(path=path, method_id=m.method_id,processing_node=None)
71
    uid_list = map(lambda x:x.uid, uid_list)
72
    if len(uid_list)>0:
73 74 75
      activity_tool.SQLDict_delMessage(uid = uid_list)

  # Registration management
Jean-Paul Smets's avatar
Jean-Paul Smets committed
76
  def registerActivityBuffer(self, activity_buffer):
77
    class_name = self.__class__.__name__
78 79 80
    setattr(activity_buffer, '_%s_uid_dict' % class_name, {})
    setattr(activity_buffer, '_%s_message_list' % class_name, [])

Jean-Paul Smets's avatar
Jean-Paul Smets committed
81
  def isMessageRegistered(self, activity_buffer, activity_tool, m):
82 83 84
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    return uid_dict.has_key((tuple(m.object_path), m.method_id))
85

Jean-Paul Smets's avatar
Jean-Paul Smets committed
86 87
  def registerMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 1
88 89 90 91
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    uid_dict[(tuple(m.object_path), m.method_id)] = 1
    getattr(activity_buffer,'_%s_message_list' % class_name).append(m)
92

Jean-Paul Smets's avatar
Jean-Paul Smets committed
93 94
  def unregisterMessage(self, activity_buffer, activity_tool, m):
    m.is_registered = 0 # This prevents from inserting deleted messages into the queue
95 96 97 98
    class_name = self.__class__.__name__
    uid_dict = getattr(activity_buffer,'_%s_uid_dict' % class_name)
    if uid_dict.has_key((tuple(m.object_path), m.method_id)):
      del uid_dict[(tuple(m.object_path), m.method_id)]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
99 100

  def getRegisteredMessageList(self, activity_buffer, activity_tool):
101 102 103 104
    class_name = self.__class__.__name__
    if hasattr(activity_buffer,'_%s_message_list' % class_name):
      message_list = getattr(activity_buffer,'_%s_message_list' % class_name)
      return filter(lambda m: m.is_registered, message_list)
105 106
    else:
      return ()
107

Jean-Paul Smets's avatar
Jean-Paul Smets committed
108
  # Queue semantic
Jean-Paul Smets's avatar
Jean-Paul Smets committed
109
  def dequeueMessage(self, activity_tool, processing_node):
110 111 112 113
    if hasattr(activity_tool,'SQLDict_readMessage'):
      now_date = DateTime()
      # Next processing date in case of error
      next_processing_date = now_date + VALIDATION_ERROR_DELAY
114
      priority = random.choice(priority_weight)
115
      # Try to find a message at given priority level which is scheduled for now
116
      result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority,
117
                                                 to_date=now_date)
118
      if len(result) == 0:
119
        # If empty, take any message which is scheduled for now
120
        priority = None
121
        result = activity_tool.SQLDict_readMessage(processing_node=processing_node, priority=priority, to_date=now_date)
122 123 124 125
      if len(result) > 0:
        line = result[0]
        path = line.path
        method_id = line.method_id
126
        uid_list = activity_tool.SQLDict_readUidList( path=path, method_id= method_id, processing_node = None, to_date=now_date )
127 128 129
        uid_list = map(lambda x:x.uid, uid_list)
        # Make sure message can not be processed anylonger
        if len(uid_list) > 0:
130
          # Set selected messages to processing
131 132 133 134
          activity_tool.SQLDict_processMessage(uid = uid_list)
        get_transaction().commit() # Release locks before starting a potentially long calculation
        # This may lead (1 for 1,000,000 in case of reindexing) to messages left in processing state
        m = self.loadMessage(line.message, uid = line.uid)
135
        # Validate message (make sure object exists, priority OK, etc.)
136 137
        validation_state = m.validate(self, activity_tool)
        if validation_state is not VALID:
138
          if validation_state in (EXCEPTION, INVALID_PATH):
139
            # There is a serious validation error - we must lower priority
140 141
            if line.priority > MAX_PRIORITY:
              # This is an error
142
              if len(uid_list) > 0:
143 144 145 146 147 148 149 150 151 152
                activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = VALIDATE_ERROR_STATE)
                                                                                # Assign message back to 'error' state
              #m.notifyUser(activity_tool)                                      # Notify Error
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              if len(uid_list) > 0: # Add some delay before new processing
                activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
                                                  priority = line.priority + 1)
              get_transaction().commit() # Release locks before starting a potentially long calculation
153
          else:
154 155 156 157
            # We do not lower priority for INVALID_ORDER errors but we do postpone execution
            if len(uid_list) > 0: # Add some delay before new processing
              activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
                                                priority = line.priority)
158
            get_transaction().commit() # Release locks before starting a potentially long calculation
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
        else:
          # Try to invoke
          activity_tool.invoke(m) # Try to invoke the message - what happens if read conflict error restarts transaction ?
          if m.is_executed:                                          # Make sure message could be invoked
            if len(uid_list) > 0:
              activity_tool.SQLDict_delMessage(uid = uid_list)                # Delete it
            get_transaction().commit()                                        # If successful, commit
            if m.active_process:
              active_process = activity_tool.unrestrictedTraverse(m.active_process)
              if not active_process.hasActivity():
                # Not more activity
                m.notifyUser(activity_tool, message="Process Finished") # XXX commit bas ???
          else:
            get_transaction().abort()                                         # If not, abort transaction and start a new one
            if line.priority > MAX_PRIORITY:
              # This is an error
              if len(uid_list) > 0:
                activity_tool.SQLDict_assignMessage(uid = uid_list, processing_node = INVOKE_ERROR_STATE)
                                                                                # Assign message back to 'error' state
              m.notifyUser(activity_tool)                                       # Notify Error
              get_transaction().commit()                                        # and commit
            else:
              # Lower priority
              if len(uid_list) > 0:
183
                activity_tool.SQLDict_setPriority(uid = uid_list, date = next_processing_date,
184 185 186 187
                                                  priority = line.priority + 1)
              get_transaction().commit() # Release locks before starting a potentially long calculation
        return 0
      get_transaction().commit() # Release locks before starting a potentially long calculation
Jean-Paul Smets's avatar
Jean-Paul Smets committed
188 189
    return 1

190
  def hasActivity(self, activity_tool, object, **kw):
191 192 193 194 195 196 197 198
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      if object is not None:
        my_object_path = '/'.join(object.getPhysicalPath())
        result = activity_tool.SQLDict_hasMessage(path=my_object_path, **kw)
        if len(result) > 0:
          return result[0].message_count > 0
      else:
        return 1 # Default behaviour if no object specified is to return 1 until active_process implemented
Jean-Paul Smets's avatar
Jean-Paul Smets committed
199 200
    return 0

Jean-Paul Smets's avatar
Jean-Paul Smets committed
201
  def flush(self, activity_tool, object_path, invoke=0, method_id=None, commit=0, **kw):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
202 203
    """
      object_path is a tuple
Jean-Paul Smets's avatar
Jean-Paul Smets committed
204 205 206 207 208 209

      commit allows to choose mode
        - if we commit, then we make sure no locks are taken for too long
        - if we do not commit, then we can use flush in a larger transaction

      commit should in general not be used
210 211

      NOTE: commiting is very likely nonsenses here. We should just avoid to flush as much as possible
Jean-Paul Smets's avatar
Jean-Paul Smets committed
212 213
    """
    path = '/'.join(object_path)
214
    # LOG('Flush', 0, str((path, invoke, method_id)))
215
    method_dict = {}
216 217 218 219 220
    if hasattr(activity_tool,'SQLDict_readMessageList'):
      # Parse each message in registered
      for m in activity_tool.getRegisteredMessageList(self):
        if list(m.object_path) == list(object_path) and (method_id is None or method_id == m.method_id):
          activity_tool.unregisterMessage(self, m)
221
          #if not method_dict.has_key(method_id or m.method_id):
222 223
          if not method_dict.has_key(m.method_id):
            method_dict[m.method_id] = 1 # Prevents calling invoke twice
224 225
            if invoke:
              # First Validate
226
              if m.validate(self, activity_tool) is VALID:
227 228 229 230
                activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
                if not m.is_executed:                                                 # Make sure message could be invoked
                  # The message no longer exists
                  raise ActivityFlushError, (
231
                      'Could not evaluate %s on %s' % (m.method_id , path))
232 233 234
              else:
                # The message no longer exists
                raise ActivityFlushError, (
235
                    'The document %s does not exist' % path)
236 237 238 239 240
      # Parse each message in SQL dict
      result = activity_tool.SQLDict_readMessageList(path=path, method_id=method_id,processing_node=None)
      for line in result:
        path = line.path
        method_id = line.method_id
Jean-Paul Smets's avatar
Jean-Paul Smets committed
241
        if not method_dict.has_key(method_id):
242
          # Only invoke once (it would be different for a queue)
243 244
          # This is optimisation with the goal to process objects on the same
          # node and minimize network traffic with ZEO server
245 246 247
          method_dict[method_id] = 1
          m = self.loadMessage(line.message, uid = line.uid)
          self.deleteMessage(activity_tool, m)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
248 249
          if invoke:
            # First Validate
250
            if m.validate(self, activity_tool) is VALID:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
251 252 253 254
              activity_tool.invoke(m) # Try to invoke the message - what happens if invoke calls flushActivity ??
              if not m.is_executed:                                                 # Make sure message could be invoked
                # The message no longer exists
                raise ActivityFlushError, (
255
                    'Could not evaluate %s on %s' % (m.method_id , path))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
256 257 258
            else:
              # The message no longer exists
              raise ActivityFlushError, (
259
                  'The document %s does not exist' % path)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
260

Jean-Paul Smets's avatar
Jean-Paul Smets committed
261
  def getMessageList(self, activity_tool, processing_node=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
262
    # YO: reading all lines might cause a deadlock
Jean-Paul Smets's avatar
Jean-Paul Smets committed
263
    message_list = []
264
    if hasattr(activity_tool,'SQLDict_readMessageList'):
265
      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node=None, to_processing_date=None)
266 267 268 269 270
      for line in result:
        m = self.loadMessage(line.message, uid = line.uid)
        m.processing_node = line.processing_node
        m.priority = line.priority
        message_list.append(m)
271 272
    return message_list

Jean-Paul Smets's avatar
Jean-Paul Smets committed
273 274
  def distribute(self, activity_tool, node_count):
    processing_node = 1
275
    if hasattr(activity_tool,'SQLDict_readMessageList'):
276 277 278 279 280 281
      now_date = DateTime()
      if (now_date - self.max_processing_date) > MAX_PROCESSING_TIME:
        # Sticky processing messages should be set back to non processing
        max_processing_date = now_date - MAX_PROCESSING_TIME
        self.max_processing_date = now_date
      else:
282
        max_processing_date = None
283 284
      result = activity_tool.SQLDict_readMessageList(path=None, method_id=None, processing_node = -1,
                                                     to_processing_date = max_processing_date) # Only assign non assigned messages
285 286 287 288
      get_transaction().commit() # Release locks before starting a potentially long calculation
      path_dict = {}
      for line in result:
        path = line.path
289 290 291 292 293 294 295 296 297 298 299 300 301 302
        broadcast = line.broadcast
        if broadcast:
          # Broadcast messages must be distributed into all nodes.
          uid = line.uid
          activity_tool.SQLDict_assignMessage(processing_node=1, uid=[uid])
          for node in range(2, node_count+1):
            activity_tool.SQLDict_writeMessage( path = path,
                                                method_id = line.method_id,
                                                priority = line.priority,
                                                broadcast = 1,
                                                processing_node = node,
                                                message = line.message,
                                                date = line.date)
        elif not path_dict.has_key(path):
303 304
          # Only assign once (it would be different for a queue)
          path_dict[path] = 1
305
          activity_tool.SQLDict_assignMessage(path=path, processing_node=processing_node, uid=None, broadcast=0)
306 307 308 309
          get_transaction().commit() # Release locks immediately to allow processing of messages
          processing_node = processing_node + 1
          if processing_node > node_count:
            processing_node = 1 # Round robin
Jean-Paul Smets's avatar
Jean-Paul Smets committed
310

311 312 313
  # Validation private methods
  def _validate_after_method_id(self, activity_tool, message, value):
    # Count number of occurances of method_id
314 315 316 317 318 319
    if type(value) == type(''):
      value = [value]
    for method_id in value:
      result = activity_tool.SQLDict_validateMessageList(method_id=method_id, message_uid=None, path=None)
      if result[0].uid_count > 0:
        return INVALID_ORDER
320
    return VALID
321

322 323 324 325 326 327
  def _validate_after_path(self, activity_tool, message, value):
    # Count number of occurances of path
    result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=None, path=value)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID
328

329 330 331 332 333 334 335
  def _validate_after_message_uid(self, activity_tool, message, value):
    # Count number of occurances of message_uid
    result = activity_tool.SQLDict_validateMessageList(method_id=None, message_uid=value, path=None)
    if result[0].uid_count > 0:
      return INVALID_ORDER
    return VALID

336 337
  # Required for tests (time shift)
  def timeShift(self, activity_tool, delay):
338 339 340 341
    """
      To simulate timeShift, we simply substract delay from
      all dates in SQLDict message table
    """
342
    activity_tool.SQLDict_timeShift(delay = delay * SECONDS_IN_DAY)
343

Jean-Paul Smets's avatar
Jean-Paul Smets committed
344
registerActivity(SQLDict)