Commit d62a0d18 authored by Chris McDonough's avatar Chris McDonough

- "Bucket finalization" is now done more aggressively. Instead of waiting

  until a bucket is garbage collected (which may be much later than the
  expiration of that bucket), we "finalize" a bucket as soon as possible
  after it gets expired.  This effectively means that the "delete notifier"
  will be called much closer to the time that a transient object actually
  expires.
                                                                                
- Add a "_last_finalized_timeslice" counter; this counter keeps track of 
  the bucket which was finalized last.  Set the initial value of
  "_last_finalized_timeslice" to -period; this services the unit tests for
  finalization, where there can actually be a timeslice that is 0 and we 
   need to finalize that bucket.
                                                                                
- Add a series of locks to prevent finalization, replentishment, and
  garbage collection from being attempted by more than one thread
  simultaneously.
                                                                                
- Add "Fake" module for interactive testing purposes (swap out BTree for
  simpler object during stress tests for isolation purposes).
                                                                                
- Allow DATA_CLASS to be specified (this is the main data structure
  class).

- Update docs and tests to account for new finalization strategy.
parent a547aeb6
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""
Module used for testing transience (BTree-API-conforming data structure)
"""
from Persistence.mapping import PersistentMapping
import sys
class FakeIOBTree(PersistentMapping):
def keys(self, min, max):
L = []
if min is None:
min = 0
if max is None:
max = sys.maxint
for k in self.data:
if min <= k <= max:
L.append(k)
return L
......@@ -32,7 +32,7 @@ Timeslices
Data Structures Maintained by a Transient Object Container
The TOC maintains five important kinds of data structures:
The TOC maintains three important kinds of data structures:
- a "_data" structure, which is an IOBTree mapping a "timeslice"
integer to a "bucket" (see next bullet for definition of bucket).
......@@ -43,8 +43,11 @@ Data Structures Maintained by a Transient Object Container
"current" bucket, which is the bucket that is contained within the
_data structured with a key equal to the "current" timeslice.
- A "max_timeslice" integer, which is equal to the "largest" timeslice
for which there exists a bucket in the _data structure.
- A "max_timeslice" integer, which is equal to the "largest"
timeslice for which there exists a bucket in the _data structure.
This is an optimization given that key operations against BTrees
can be slow and could cause conflicts (the same could be achieved
via _data.maxKey() otherwise).
When a Transient Object is created via new_or_existing, it is added
to the "current" bucket. As time goes by, the bucket to which the
......@@ -67,25 +70,25 @@ How the TransientObjectContainer Determines if a TransientObject is "Current"
All "current" timeslice buckets (as specified by the timeout) are
searched for the transient object, most recent bucket first.
Housekeeping: Notification, Garbage Collection, and Bucket
Housekeeping: Finalization, Garbage Collection, and Bucket
Replentishing
The TOC performs "notification", "garbage collection", and "bucket
The TOC performs "finalization", "garbage collection", and "bucket
replentishing". It performs these tasks "in-band". This means that
the TOC does not maintain a separate thread that wakes up every so
often to do these housekeeping tasks. Instead, during the course of
normal operations, the TOC opportunistically performs them.
Finalization is defined as optionally calling a function at bucket
expiration time against all transient objects contained within that
bucket. The optional function call is user-defined, but it is
managed by the "notifyDel" method of the TOC.
Garbage collection is defined as deleting "expired" buckets in the
_data structure (the _data structure maps a timeslice to a bucket).
Typically this is done by throwing away one or more buckets in the
_data structure after they expire.
Notification is defined as optionally calling a function at TOC
finalization time against individual transient object contained
within a bucket. The optional function call is user-defined, but it
is managed by the "notifyDel" method of the TOC.
Bucket replentishing is defined as the action of (opportunistically)
creating more buckets to insert into the the _data structure,
replacing ones that are deleted during garbage collection. The act
......@@ -93,7 +96,9 @@ Replentishing
will be immediately created thereafter. We create new buckets in
batches to reduce the possibility of conflicts.
Housekeeping is performed on a somewhat random basis to avoid
Finalization is attempted on every call to the transience machinery
to make TOs appear to expire "on time". Garbage collection and
replentishment is performed on a somewhat random basis to avoid
unnecessary conflicts.
Goals
......
......@@ -44,6 +44,7 @@ from AccessControl.User import nobody
from zLOG import LOG, WARNING, INFO
from TransientObject import TransientObject
from Fake import FakeIOBTree
ADD_CONTAINER_PERM = 'Add Transient Object Container'
MGMT_SCREEN_PERM = 'View management screens'
......@@ -54,6 +55,7 @@ MANAGE_CONTAINER_PERM = 'Manage Transient Object Container'
SPARE_BUCKETS = 15 # minimum number of buckets to keep "spare"
BUCKET_CLASS = OOBTree # constructor for buckets
DATA_CLASS = IOBTree # const for main data structure (timeslice->"bucket")
STRICT = os.environ.get('Z_TOC_STRICT', '')
DEBUG = int(os.environ.get('Z_TOC_DEBUG', 0))
......@@ -130,6 +132,14 @@ class TransientObjectContainer(SimpleItem):
security.setDefaultAccess('deny')
# intitialize locks used for finalization, replentishing, and
# garbage collection (used in _finalize, _replentish, and _gc
# respectively)
finalize_lock = thread.allocate_lock()
replentish_lock = thread.allocate_lock()
gc_lock = thread.allocate_lock()
def __init__(self, id, title='', timeout_mins=20, addNotification=None,
delNotification=None, limit=0, period_secs=20):
self.id = id
......@@ -190,13 +200,14 @@ class TransientObjectContainer(SimpleItem):
# "bucket". Each bucket will contain a set of transient items.
# Transient items move automatically from bucket-to-bucket inside
# of the _data structure based on last access time (e.g.
# "get" calls), escaping destruction only if they move quickly
# enough.
# "get" calls), escaping expiration and eventual destruction only if
# they move quickly enough.
#
# We make enough buckets initially to last us a while, and
# we subsequently extend _data with fresh buckets and remove old
# buckets as necessary during normal operations (see
# _gc() and _replentish()).
self._data = IOBTree()
self._data = DATA_CLASS()
# populate _data with some number of buckets, each of which
# is "current" for its timeslice key
......@@ -207,15 +218,27 @@ class TransientObjectContainer(SimpleItem):
self._period)
for i in new_slices:
self._data[i] = BUCKET_CLASS()
# create an Increaser for max timeslice
# max_timeslice is at any time during operations the highest
# key value in _data. Its existence is an optimization; getting
# the maxKey of a BTree directly is read-conflict-prone.
self._max_timeslice = Increaser(max(new_slices))
else:
self._data[0] = BUCKET_CLASS() # sentinel value for non-expiring
self._max_timeslice = Increaser(0)
# our "_length" is the length of _index.
# '_last_finalized_timeslice' is a value that indicates which
# timeslice had its items last run through the finalization
# process. The finalization process calls the delete notifier for
# each expired item.
self._last_finalized_timeslice = Increaser(-self._period)
# our "_length" is the number of "active" data objects in _data.
# it does not include items that are still kept in _data but need to
# be garbage collected.
#
# we need to maintain the length of the index structure separately
# because getting the length of a BTree is very expensive.
# because getting the length of a BTree is very expensive, and it
# doesn't really tell us which ones are "active" anyway.
try: self._length.set(0)
except AttributeError: self._length = self.getLen = Length()
......@@ -241,77 +264,83 @@ class TransientObjectContainer(SimpleItem):
return result
def _move_item(self, k, current_ts, default=None):
if self._timeout_slices:
if not self._timeout_slices:
# special case for no timeout value
bucket = self._data.get(0)
return bucket.get(k, default)
# always call finalize
self._finalize(current_ts)
# call gc and/or replentish on an only-as needed basis
if self._roll(current_ts, 'replentish'):
self._replentish(current_ts)
if self._roll(current_ts, 'gc'):
self._gc(current_ts)
# SUBTLETY ALERTY TO SELF: do not "improve" the code below
# unnecessarily, as it will end only in tears. The lack of aliases
# and the ordering is intentional.
STRICT and _assert(self._data.has_key(current_ts))
current = self._getCurrentSlices(current_ts)
current_slices = self._getCurrentSlices(current_ts)
found_ts = None
for ts in current:
bucket = self._data.get(ts)
for ts in current_slices:
abucket = self._data.get(ts, None)
if abucket is None:
DEBUG and TLOG('_move_item: no bucket for ts %s' % ts)
continue
DEBUG and TLOG(
'_move_item: bucket for ts %s is %s' % (ts, id(bucket)))
# dont use hasattr here (it hides conflict errors)
if getattr(bucket, 'has_key', None):
if DEBUG:
keys = list(bucket.keys())
'_move_item: bucket for ts %s is %s' % (ts, id(abucket)))
DEBUG and TLOG(
'_move_item: keys for ts %s (bucket %s)-- %s' %
(ts, id(bucket), str(keys))
(ts, id(abucket), str(list(abucket.keys())))
)
if bucket.has_key(k):
# uhghost?
if abucket.get(k, None) is not None:
found_ts = ts
break
DEBUG and TLOG('_move_item: found_ts is %s' % found_ts)
if found_ts is None:
DEBUG and TLOG('_move_item: returning default of %s' % default)
return default
bucket = self._data[found_ts]
item = bucket[k]
if found_ts != current_ts:
if current_ts != found_ts:
DEBUG and TLOG('_move_item: current_ts (%s) != found_ts (%s), '
'moving to current' % (current_ts, found_ts))
DEBUG and TLOG(
'_move_item: moving item %s from %s to %s' % (
k, found_ts, current_ts))
del bucket[k]
'_move_item: keys for found_ts %s (bucket %s): %s' % (
found_ts, id(self._data[found_ts]),
`list(self._data[found_ts].keys())`)
)
self._data[current_ts][k] = self._data[found_ts][k]
if not issubclass(BUCKET_CLASS, Persistent):
# tickle persistence machinery
self._data[found_ts] = bucket
self._data[current_ts] = self._data[current_ts]
DEBUG and TLOG(
'_move_item: deleted key %s from bucket %s' % (
k,id(bucket))
)
if DEBUG:
keys = list(bucket.keys())
DEBUG and TLOG(
'_move_item: keys for found_ts %s (bucket %s): %s' % (
found_ts, id(bucket), str(keys))
)
STRICT and _assert(bucket.get(k, None) is None)
STRICT and _assert(not bucket.has_key(k))
current_bucket = self._data[current_ts]
current_bucket[k] = item
'_move_item: copied item %s from %s to %s (bucket %s)' % (
k, found_ts, current_ts, id(self._data[current_ts])))
del self._data[found_ts][k]
if not issubclass(BUCKET_CLASS, Persistent):
# tickle persistence machinery
self._data[current_ts] = current_bucket
else:
# special case for no timeout value
bucket = self._data.get(0)
item = bucket.get(k, default)
self._data[found_ts] = self._data[found_ts]
DEBUG and TLOG(
'_move_item: deleted item %s from ts %s (bucket %s)' % (
k, found_ts, id(self._data[found_ts]))
)
STRICT and _assert(self._data[found_ts].get(k, None) is None)
STRICT and _assert(not self._data[found_ts].has_key(k))
# dont use hasattr here (it hides conflict errors)
if getattr(item, 'setLastAccessed', None):
item.setLastAccessed()
return item
if getattr(self._data[current_ts][k], 'setLastAccessed', None):
self._data[current_ts][k].setLastAccessed()
DEBUG and TLOG('_move_item: returning %s from current_ts %s '
% (k, current_ts))
return self._data[current_ts][k]
def _all(self):
if self._timeout_slices:
......@@ -319,6 +348,8 @@ class TransientObjectContainer(SimpleItem):
else:
current_ts = 0
self._finalize(current_ts)
if self._roll(current_ts, 'replentish'):
self._replentish(current_ts)
......@@ -485,22 +516,112 @@ class TransientObjectContainer(SimpleItem):
DEBUG and TLOG('_roll: %s rigged toss' % reason)
return True
else:
# we're not in an emergency bucket shortage, so we can take
# our chances during the roll. It's highly unlikely that two
# threads will win the roll simultaneously, so we avoid a certain
# class of conflicts here.
# we're not in an emergency bucket shortage, so we can
# take our chances during the roll. It's unlikely that
# two threads will win the roll simultaneously, so we
# avoid a certain class of conflicts here.
if random.randrange(low, high) == low: # WINNAH!
DEBUG and TLOG("_roll: %s roll winner" % reason)
return True
DEBUG and TLOG("_roll: %s roll loser" % reason)
return False
def _get_max_expired_ts(self, now):
return now - (self._period * (self._timeout_slices + 1))
def _finalize(self, now):
if not self._timeout_slices:
DEBUG and TLOG('_finalize: doing nothing (no timeout)')
return # don't do any finalization if there is no timeout
# The nature of sessioning is that when the timeslice rolls
# over, all active threads will try to do a lot of work during
# finalization, all but one unnecessarily. We really don't
# want more than one thread at a time to try to finalize
# buckets at the same time so we try to lock. We give up if we
# can't lock immediately because it doesn't matter if we skip
# a couple of opportunities for finalization, as long as it
# gets done by some thread eventually. A similar pattern
# exists for _gc and _replentish.
if not self.finalize_lock.acquire(0):
DEBUG and TLOG('_finalize: couldnt acquire lock')
return
try:
DEBUG and TLOG('_finalize: lock acquired successfully')
if now is None:
now = getCurrentTimeslice(self._period) # for unit tests
# we want to start finalizing from one timeslice after the
# timeslice which we last finalized. Note that finalizing
# an already-finalized bucket somehow sends persistence
# into a spin with an exception later raised:
# "SystemError: error return without exception set",
# typically coming from
# Products.Sessions.SessionDataManager, line 182, in
# _getSessionDataObject (if getattr(ob, '__of__', None)
# and getattr(ob, 'aq_parent', None)). According to this
# email message from Jim, it may be because the ob is
# ghosted and doesn't have a _p_jar somehow:
#http://mail.zope.org/pipermail/zope3-dev/2003-February/005625.html
start_finalize = self._last_finalized_timeslice() + self._period
# we want to finalize only up to the maximum expired timeslice
max_ts = self._get_max_expired_ts(now)
if start_finalize >= max_ts:
DEBUG and TLOG(
'_finalize: start_finalize (%s) >= max_ts (%s), '
'doing nothing' % (start_finalize, max_ts))
return
DEBUG and TLOG('_finalize: now is %s' % now)
DEBUG and TLOG('_finalize: max_ts is %s' % max_ts)
DEBUG and TLOG('_finalize: start_finalize is %s' % start_finalize)
to_finalize = list(self._data.keys(start_finalize, max_ts))
DEBUG and TLOG('_finalize: to_finalize is %s' % `to_finalize`)
delta = 0
for key in to_finalize:
assert(start_finalize <= key <= max_ts)
STRICT and _assert(self._data.has_key(key))
values = list(self._data[key].values())
DEBUG and TLOG('_finalize: values to notify from ts %s '
'are %s' % (key, `list(values)`))
delta += len(values)
for v in values:
self.notifyDel(v)
if delta:
self._length.change(-delta)
DEBUG and TLOG('_finalize: setting _last_finalized_timeslice '
'to max_ts of %s' % max_ts)
self._last_finalized_timeslice.set(max_ts)
finally:
self.finalize_lock.release()
def _replentish(self, now):
# available_spares == the number of "spare" buckets that exist in
# "_data"
if not self._timeout_slices:
return # do nothing if no timeout
if not self.replentish_lock.acquire(0):
DEBUG and TLOG('_replentish: couldnt acquire lock')
return
try:
max_ts = self._max_timeslice()
available_spares = (max_ts-now) / self._period
DEBUG and TLOG('_replentish: now = %s' % now)
......@@ -508,7 +629,13 @@ class TransientObjectContainer(SimpleItem):
DEBUG and TLOG('_replentish: available_spares = %s'
% available_spares)
if available_spares < SPARE_BUCKETS:
if available_spares >= SPARE_BUCKETS:
DEBUG and TLOG('_replentish: available_spares (%s) >= '
'SPARE_BUCKETS (%s), doing '
'nothing'% (available_spares,
SPARE_BUCKETS))
return
if max_ts < now:
replentish_start = now
replentish_end = now + (self._period * SPARE_BUCKETS)
......@@ -538,16 +665,24 @@ class TransientObjectContainer(SimpleItem):
time.sleep(random.uniform(0, 1)) # add entropy
raise
self._max_timeslice.set(max(new_buckets))
finally:
self.replentish_lock.release()
def _gc(self, now=None):
if not self._timeout_slices:
return # dont do gc if there is no timeout
if not self.gc_lock.acquire(0):
DEBUG and TLOG('_gc: couldnt acquire lock')
return
try:
if now is None:
now = getCurrentTimeslice(self._period) # for unit tests
max_ts = now - (self._period * (self._timeout_slices + 1))
to_notify = []
# we want to garbage collect all buckets that have already been run
# through finalization
max_ts = self._last_finalized_timeslice()
DEBUG and TLOG('_gc: now is %s' % now)
DEBUG and TLOG('_gc: max_ts is %s' % max_ts)
......@@ -555,16 +690,10 @@ class TransientObjectContainer(SimpleItem):
for key in list(self._data.keys(None, max_ts)):
assert(key <= max_ts)
STRICT and _assert(self._data.has_key(key))
for v in self._data[key].values():
to_notify.append(v)
self._length.change(-1)
DEBUG and TLOG('deleting %s from _data' % key)
del self._data[key]
for v in to_notify:
self.notifyDel(v)
finally:
self.gc_lock.release()
def notifyAdd(self, item):
DEBUG and TLOG('notifyAdd with %s' % item)
......@@ -759,6 +888,9 @@ class TransientObjectContainer(SimpleItem):
# create an Increaser for max timeslice
self._max_timeslice = Increaser(max(new_slices))
if not state.has_key('_last_finalized_timeslice'):
self._last_finalized_timeslice = Increaser(-self._period)
# we should probably delete older attributes from state such as
# '_last_timeslice', '_deindex_next',and '__len__' here but we leave
# them in order to allow people to switch between 2.6.0->2.7.0 and
......
......@@ -109,10 +109,9 @@ class TestNotifications(TestBase):
self.app.sm.setDelNotificationTarget(delNotificationTarget)
sdo = self.app.sm.new_or_existing('TempObject')
timeout = self.timeout * 60
fauxtime.sleep(timeout + (timeout * .75))
# sleep 2X longer than timeout? doesnt work at 1.1X, 1.5X?
fauxtime.sleep(timeout * 2)
sdo1 = self.app.sm.get('TempObject')
# force the sdm to do housekeeping
self.app.sm._gc()
now = fauxtime.time()
k = sdo.get('endtime')
self.assertEqual(type(k), type(now))
......
......@@ -249,7 +249,7 @@ class TestTransientObjectContainer(TestBase):
self.assertEqual(len(self.t.keys()), 100)
# call _gc just to make sure __len__ gets changed after a gc
self.t._gc()
#self.t._gc()
self.assertEqual(len(self.t), 100)
# we should still have 100 - 199
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment