Commit 23a81608 authored by Tim Peters's avatar Tim Peters

Merge tim-simpler_connection branch.

There's no longer a hard limit on # of open connections per DB.

Introduced a sane scheme for raising deprecation warnings.
Sane ==

1. The machinery ensures that a "this will be removed in ZODB 3.6"
   blurb gets attached to all deprecation warnings.

and

2. It will dead easy to find these when it's time for 3.6.
parent cd89cdf2
...@@ -2,6 +2,33 @@ What's new in ZODB3 3.4? ...@@ -2,6 +2,33 @@ What's new in ZODB3 3.4?
======================== ========================
Release date: DD-MMM-2004 Release date: DD-MMM-2004
DB
--
- There is no longer a hard limit on the number of connections that
``DB.open()`` will create. In other words, ``DB.open()`` never blocks
anymore waiting for an earlier connection to close, and ``DB.open()``
always returns a connection now (while it wasn't documented, it was
possible for ``DB.open()`` to return ``None`` before).
``pool_size`` continues to default to 7, but its meaning has changed:
if more than ``pool_size`` connections are obtained from ``DB.open()``
and not closed, a warning is logged; if more than twice ``pool_size``, a
critical problem is logged. ``pool_size`` should be set to the maximum
number of connections from the ``DB`` instance you expect to have open
simultaneously.
In addition, if a connection obtained from ``DB.open()`` becomes
unreachable without having been explicitly closed, when Python's garbage
collection reclaims that connection it no longer counts against the
``pool_size`` thresholds for logging messages.
The following optional arguments to ``DB.open()`` are deprecated:
``transaction``, ``waitflag``, ``force`` and ``temporary``. If one
is specified, its value is ignored, and ``DeprecationWarning`` is
raised. In ZODB 3.6, these optional arguments will be removed.
Tools Tools
----- -----
......
...@@ -182,6 +182,7 @@ def copy_other_files(cmd, outputbase): ...@@ -182,6 +182,7 @@ def copy_other_files(cmd, outputbase):
"ZConfig/tests/library/widget", "ZConfig/tests/library/widget",
"ZEO", "ZEO",
"ZODB", "ZODB",
"ZODB/tests",
"zdaemon", "zdaemon",
"zdaemon/tests", "zdaemon/tests",
]: ]:
......
...@@ -34,6 +34,8 @@ from ZODB.TmpStore import TmpStore ...@@ -34,6 +34,8 @@ from ZODB.TmpStore import TmpStore
from ZODB.utils import u64, oid_repr, z64, positive_id from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
from ZODB.interfaces import IConnection from ZODB.interfaces import IConnection
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements from zope.interface import implements
global_reset_counter = 0 global_reset_counter = 0
...@@ -262,9 +264,8 @@ class Connection(ExportImport, object): ...@@ -262,9 +264,8 @@ class Connection(ExportImport, object):
method. You can pass a transaction manager (TM) to DB.open() method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses. to control which TM the Connection uses.
""" """
warnings.warn("getTransaction() is deprecated. " deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.", "Use the txn_mgr argument to DB.open() instead.")
DeprecationWarning)
return self._txn_mgr.get() return self._txn_mgr.get()
def setLocalTransaction(self): def setLocalTransaction(self):
...@@ -276,9 +277,8 @@ class Connection(ExportImport, object): ...@@ -276,9 +277,8 @@ class Connection(ExportImport, object):
can pass a transaction manager (TM) to DB.open() to control can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses. which TM the Connection uses.
""" """
warnings.warn("setLocalTransaction() is deprecated. " deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.", "Use the txn_mgr argument to DB.open() instead.")
DeprecationWarning)
if self._txn_mgr is transaction.manager: if self._txn_mgr is transaction.manager:
if self._synch: if self._synch:
self._txn_mgr.unregisterSynch(self) self._txn_mgr.unregisterSynch(self)
...@@ -486,14 +486,14 @@ class Connection(ExportImport, object): ...@@ -486,14 +486,14 @@ class Connection(ExportImport, object):
def cacheFullSweep(self, dt=None): def cacheFullSweep(self, dt=None):
# XXX needs doc string # XXX needs doc string
warnings.warn("cacheFullSweep is deprecated. " deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.", DeprecationWarning) "Use cacheMinimize instead.")
if dt is None: if dt is None:
self._cache.full_sweep() self._cache.full_sweep()
else: else:
self._cache.full_sweep(dt) self._cache.full_sweep(dt)
def cacheMinimize(self, dt=None): def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache. """Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn Call _p_deactivate() on each cached object, attempting to turn
...@@ -503,9 +503,8 @@ class Connection(ExportImport, object): ...@@ -503,9 +503,8 @@ class Connection(ExportImport, object):
:Parameters: :Parameters:
- `dt`: ignored. It is provided only for backwards compatibility. - `dt`: ignored. It is provided only for backwards compatibility.
""" """
if dt is not None: if dt is not DEPRECATED_ARGUMENT:
warnings.warn("The dt argument to cacheMinimize is ignored.", deprecated36("cacheMinimize() dt= is ignored.")
DeprecationWarning)
self._cache.minimize() self._cache.minimize()
def cacheGC(self): def cacheGC(self):
...@@ -781,8 +780,8 @@ class Connection(ExportImport, object): ...@@ -781,8 +780,8 @@ class Connection(ExportImport, object):
# an oid is being registered. I can't think of any way to # an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is # achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning. # a way, this will be a very confusing warning.
warnings.warn("Assigning to _p_jar is deprecated", deprecated36("Assigning to _p_jar is deprecated, and will be "
DeprecationWarning) "changed to raise an exception.")
elif obj._p_oid in self._added: elif obj._p_oid in self._added:
# It was registered before it was added to _added. # It was registered before it was added to _added.
return return
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
$Id$""" $Id$"""
import cPickle, cStringIO, sys import cPickle, cStringIO, sys
from thread import allocate_lock import threading
from time import time, ctime from time import time, ctime
import warnings import warnings
import logging import logging
...@@ -25,11 +25,117 @@ from ZODB.broken import find_global ...@@ -25,11 +25,117 @@ from ZODB.broken import find_global
from ZODB.utils import z64 from ZODB.utils import z64
from ZODB.Connection import Connection from ZODB.Connection import Connection
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.utils import WeakSet
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
import transaction import transaction
logger = logging.getLogger('ZODB.DB') logger = logging.getLogger('ZODB.DB')
class _ConnectionPool(object):
"""Manage a pool of connections.
CAUTION: Methods should be called under the protection of a lock.
This class does no locking of its own.
There's no limit on the number of connections this can keep track of,
but a warning is logged if there are more than pool_size active
connections, and a critical problem if more than twice pool_size.
New connections are registered via push(). This will log a message if
"too many" connections are active.
When a connection is explicitly closed, tell the pool via repush().
That adds the connection to a stack of connections available for
reuse, and throws away the oldest stack entries if the stack is too large.
pop() pops this stack.
When a connection is obtained via pop(), the pool holds only a weak
reference to it thereafter. It's not necessary to inform the pool
if the connection goes away. A connection handed out by pop() counts
against pool_size only so long as it exists, and provided it isn't
repush()'ed. A weak reference is retained so that DB methods like
connectionDebugInfo() can still gather statistics.
"""
def __init__(self, pool_size):
# The largest # of connections we expect to see alive simultaneously.
self.pool_size = pool_size
# A weak set of all connections we've seen. A connection vanishes
# from this set if pop() hands it out, it's not reregistered via
# repush(), and it becomes unreachable.
self.all = WeakSet()
# A stack of connections available to hand out. This is a subset
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than pool_size entries
# in this stack.
# In Python 2.4, a collections.deque would make more sense than
# a list (we push only "on the right", but may pop from both ends).
self.available = []
# Change our belief about the expected maximum # of live connections.
# If the pool_size is smaller than the current value, this may discard
# the oldest available connections.
def set_pool_size(self, pool_size):
self.pool_size = pool_size
self._reduce_size()
# Register a new available connection. We must not know about c already.
# c will be pushed onto the available stack even if we're over the
# pool size limit.
def push(self, c):
assert c not in self.all
assert c not in self.available
self._reduce_size(strictly_less=True)
self.all.add(c)
self.available.append(c)
n, limit = len(self.all), self.pool_size
if n > limit:
reporter = logger.warn
if n > 2 * limit:
reporter = logger.critical
reporter("DB.open() has %s open connections with a pool_size "
"of %s", n, limit)
# Reregister an available connection formerly obtained via pop(). This
# pushes it on the stack of available connections, and may discard
# older available connections.
def repush(self, c):
assert c in self.all
assert c not in self.available
self._reduce_size(strictly_less=True)
self.available.append(c)
# Throw away the oldest available connections until we're under our
# target size (strictly_less=False) or no more than that (strictly_less=
# True, the default).
def _reduce_size(self, strictly_less=False):
target = self.pool_size - bool(strictly_less)
while len(self.available) > target:
c = self.available.pop(0)
self.all.remove(c)
# Pop an available connection and return it, or return None if none are
# available. In the latter case, the caller should create a new
# connection, register it via push(), and call pop() again. The
# caller is responsible for serializing this sequence.
def pop(self):
result = None
if self.available:
result = self.available.pop()
# Leave it in self.all, so we can still get at it for statistics
# while it's alive.
assert result in self.all
return result
# Return a list of all connections we currently know about.
def all_as_list(self):
return self.all.as_list()
class DB(object): class DB(object):
"""The Object Database """The Object Database
------------------- -------------------
...@@ -41,9 +147,9 @@ class DB(object): ...@@ -41,9 +147,9 @@ class DB(object):
The DB instance manages a pool of connections. If a connection is The DB instance manages a pool of connections. If a connection is
closed, it is returned to the pool and its object cache is closed, it is returned to the pool and its object cache is
preserved. A subsequent call to open() will reuse the connection. preserved. A subsequent call to open() will reuse the connection.
There is a limit to the pool size; if all its connections are in There is no hard limit on the pool size. If more than `pool_size`
use, calls to open() will block until one of the open connections connections are opened, a warning is logged, and if more than twice
is closed. that many, a critical problem is logged.
The class variable 'klass' is used by open() to create database The class variable 'klass' is used by open() to create database
connections. It is set to Connection, but a subclass could override connections. It is set to Connection, but a subclass could override
...@@ -81,41 +187,42 @@ class DB(object): ...@@ -81,41 +187,42 @@ class DB(object):
def __init__(self, storage, def __init__(self, storage,
pool_size=7, pool_size=7,
cache_size=400, cache_size=400,
cache_deactivate_after=None, cache_deactivate_after=DEPRECATED_ARGUMENT,
version_pool_size=3, version_pool_size=3,
version_cache_size=100, version_cache_size=100,
version_cache_deactivate_after=None, version_cache_deactivate_after=DEPRECATED_ARGUMENT,
): ):
"""Create an object database. """Create an object database.
:Parameters: :Parameters:
- `storage`: the storage used by the database, e.g. FileStorage - `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: maximum number of open connections - `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache - `cache_size`: target size of Connection object cache
- `cache_deactivate_after`: ignored - `version_pool_size`: expected maximum number of connections (per
- `version_pool_size`: maximum number of connections (per version) version)
- `version_cache_size`: target size of Connection object cache for - `version_cache_size`: target size of Connection object cache for
version connections version connections
- `cache_deactivate_after`: ignored
- `version_cache_deactivate_after`: ignored - `version_cache_deactivate_after`: ignored
""" """
# Allocate locks: # Allocate lock.
l = allocate_lock() x = threading.RLock()
self._a = l.acquire self._a = x.acquire
self._r = l.release self._r = x.release
# Setup connection pools and cache info # Setup connection pools and cache info
self._pools = {},[] # _pools maps a version string to a _ConnectionPool object.
self._temps = [] self._pools = {}
self._pool_size = pool_size self._pool_size = pool_size
self._cache_size = cache_size self._cache_size = cache_size
self._version_pool_size = version_pool_size self._version_pool_size = version_pool_size
self._version_cache_size = version_cache_size self._version_cache_size = version_cache_size
# warn about use of deprecated arguments # warn about use of deprecated arguments
if (cache_deactivate_after is not None or if cache_deactivate_after is not DEPRECATED_ARGUMENT:
version_cache_deactivate_after is not None): deprecated36("cache_deactivate_after has no effect")
warnings.warn("cache_deactivate_after has no effect", if version_cache_deactivate_after is not DEPRECATED_ARGUMENT:
DeprecationWarning) deprecated36("version_cache_deactivate_after has no effect")
self._miv_cache = {} self._miv_cache = {}
...@@ -151,6 +258,7 @@ class DB(object): ...@@ -151,6 +258,7 @@ class DB(object):
if hasattr(storage, 'undoInfo'): if hasattr(storage, 'undoInfo'):
self.undoInfo = storage.undoInfo self.undoInfo = storage.undoInfo
# This is called by Connection.close().
def _closeConnection(self, connection): def _closeConnection(self, connection):
"""Return a connection to the pool. """Return a connection to the pool.
...@@ -165,10 +273,10 @@ class DB(object): ...@@ -165,10 +273,10 @@ class DB(object):
am = self._activity_monitor am = self._activity_monitor
if am is not None: if am is not None:
am.closedConnection(connection) am.closedConnection(connection)
version = connection._version version = connection._version
pools, pooll = self._pools
try: try:
pool, allocated, pool_lock = pools[version] pool = self._pools[version]
except KeyError: except KeyError:
# No such version. We must have deleted the pool. # No such version. We must have deleted the pool.
# Just let the connection go. # Just let the connection go.
...@@ -177,30 +285,18 @@ class DB(object): ...@@ -177,30 +285,18 @@ class DB(object):
# XXX What objects are involved in the cycle? # XXX What objects are involved in the cycle?
connection.__dict__.clear() connection.__dict__.clear()
return return
pool.repush(connection)
pool.append(connection)
if len(pool) == 1:
# Pool now usable again, unlock it.
pool_lock.release()
finally: finally:
self._r() self._r()
# Call f(c) for all connections c in all pools in all versions.
def _connectionMap(self, f): def _connectionMap(self, f):
self._a() self._a()
try: try:
pools, pooll = self._pools for pool in self._pools.values():
for pool, allocated in pooll: for c in pool.all_as_list():
for cc in allocated: f(c)
f(cc)
temps = self._temps
if temps:
t = []
rc = sys.getrefcount
for cc in temps:
if rc(cc) > 3:
f(cc)
self._temps = t
finally: finally:
self._r() self._r()
...@@ -216,12 +312,12 @@ class DB(object): ...@@ -216,12 +312,12 @@ class DB(object):
""" """
detail = {} detail = {}
def f(con, detail=detail, have_detail=detail.has_key): def f(con, detail=detail):
for oid, ob in con._cache.items(): for oid, ob in con._cache.items():
module = getattr(ob.__class__, '__module__', '') module = getattr(ob.__class__, '__module__', '')
module = module and '%s.' % module or '' module = module and '%s.' % module or ''
c = "%s%s" % (module, ob.__class__.__name__) c = "%s%s" % (module, ob.__class__.__name__)
if have_detail(c): if c in detail:
detail[c] += 1 detail[c] += 1
else: else:
detail[c] = 1 detail[c] = 1
...@@ -276,7 +372,7 @@ class DB(object): ...@@ -276,7 +372,7 @@ class DB(object):
self._connectionMap(lambda c: c._cache.full_sweep()) self._connectionMap(lambda c: c._cache.full_sweep())
def cacheLastGCTime(self): def cacheLastGCTime(self):
m=[0] m = [0]
def f(con, m=m): def f(con, m=m):
t = con._cache.cache_last_gc_time t = con._cache.cache_last_gc_time
if t > m[0]: if t > m[0]:
...@@ -289,7 +385,7 @@ class DB(object): ...@@ -289,7 +385,7 @@ class DB(object):
self._connectionMap(lambda c: c._cache.minimize()) self._connectionMap(lambda c: c._cache.minimize())
def cacheSize(self): def cacheSize(self):
m=[0] m = [0]
def f(con, m=m): def f(con, m=m):
m[0] += con._cache.cache_non_ghost_count m[0] += con._cache.cache_non_ghost_count
...@@ -299,9 +395,9 @@ class DB(object): ...@@ -299,9 +395,9 @@ class DB(object):
def cacheDetailSize(self): def cacheDetailSize(self):
m = [] m = []
def f(con, m=m): def f(con, m=m):
m.append({'connection':repr(con), m.append({'connection': repr(con),
'ngsize':con._cache.cache_non_ghost_count, 'ngsize': con._cache.cache_non_ghost_count,
'size':len(con._cache)}) 'size': len(con._cache)})
self._connectionMap(f) self._connectionMap(f)
m.sort() m.sort()
return m return m
...@@ -358,39 +454,24 @@ class DB(object): ...@@ -358,39 +454,24 @@ class DB(object):
if connection is not None: if connection is not None:
version = connection._version version = connection._version
# Update modified in version cache # Update modified in version cache
# XXX must make this work with list or dict to backport to 2.6
for oid in oids.keys(): for oid in oids.keys():
h = hash(oid) % 131 h = hash(oid) % 131
o = self._miv_cache.get(h, None) o = self._miv_cache.get(h, None)
if o is not None and o[0]==oid: if o is not None and o[0]==oid:
del self._miv_cache[h] del self._miv_cache[h]
# Notify connections # Notify connections.
for pool, allocated in self._pools[1]: def inval(c):
for cc in allocated: if (c is not connection and
if (cc is not connection and (not version or c._version == version)):
(not version or cc._version==version)): c.invalidate(tid, oids)
if sys.getrefcount(cc) <= 3: self._connectionMap(inval)
cc.close()
cc.invalidate(tid, oids)
if self._temps:
t = []
for cc in self._temps:
if sys.getrefcount(cc) > 3:
if (cc is not connection and
(not version or cc._version == version)):
cc.invalidate(tid, oids)
t.append(cc)
else:
cc.close()
self._temps = t
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
h = hash(oid) % 131 h = hash(oid) % 131
cache = self._miv_cache cache = self._miv_cache
o=cache.get(h, None) o = cache.get(h, None)
if o and o[0]==oid: if o and o[0] == oid:
return o[1] return o[1]
v = self._storage.modifiedInVersion(oid) v = self._storage.modifiedInVersion(oid)
cache[h] = oid, v cache[h] = oid, v
...@@ -399,202 +480,107 @@ class DB(object): ...@@ -399,202 +480,107 @@ class DB(object):
def objectCount(self): def objectCount(self):
return len(self._storage) return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None, def open(self, version='',
waitflag=1, mvcc=True, txn_mgr=None, synch=True): transaction=DEPRECATED_ARGUMENT, temporary=DEPRECATED_ARGUMENT,
force=DEPRECATED_ARGUMENT, waitflag=DEPRECATED_ARGUMENT,
mvcc=True, txn_mgr=None, synch=True):
"""Return a database Connection for use by application code. """Return a database Connection for use by application code.
The optional version argument can be used to specify that a The optional `version` argument can be used to specify that a
version connection is desired. version connection is desired.
The optional transaction argument can be provided to cause the
connection to be automatically closed when a transaction is
terminated. In addition, connections per transaction are
reused, if possible.
Note that the connection pool is managed as a stack, to Note that the connection pool is managed as a stack, to
increate the likelihood that the connection's stack will increase the likelihood that the connection's stack will
include useful objects. include useful objects.
:Parameters: :Parameters:
- `version`: the "version" that all changes will be made - `version`: the "version" that all changes will be made
in, defaults to no version. in, defaults to no version.
- `transaction`: XXX
- `temporary`: XXX
- `force`: XXX
- `waitflag`: XXX
- `mvcc`: boolean indicating whether MVCC is enabled - `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means - `txn_mgr`: transaction manager to use. None means
used the default transaction manager. used the default transaction manager.
- `synch`: boolean indicating whether Connection should - `synch`: boolean indicating whether Connection should
register for afterCompletion() calls. register for afterCompletion() calls.
""" """
if temporary is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() temporary= ignored. "
"open() no longer blocks.")
if force is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() force= ignored. "
"open() no longer blocks.")
if waitflag is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() waitflag= ignored. "
"open() no longer blocks.")
if transaction is not DEPRECATED_ARGUMENT:
deprecated36("DB.open() transaction= ignored.")
self._a() self._a()
try: try:
# pool <- the _ConnectionPool for this version
pool = self._pools.get(version)
if pool is None:
if version:
size = self._version_pool_size
else:
size = self._pool_size
self._pools[version] = pool = _ConnectionPool(size)
assert pool is not None
if transaction is not None: # result <- a connection
connections = transaction._connections result = pool.pop()
if connections: if result is None:
if connections.has_key(version) and not temporary: if version:
return connections[version] size = self._version_cache_size
else: else:
transaction._connections = connections = {} size = self._cache_size
transaction = transaction._connections c = self.klass(version=version, cache_size=size,
mvcc=mvcc, txn_mgr=txn_mgr)
if temporary: pool.push(c)
# This is a temporary connection. result = pool.pop()
# We won't bother with the pools. This will be assert result is not None
# a one-use connection.
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
c._setDB(self)
self._temps.append(c)
if transaction is not None:
transaction[id(c)] = c
return c
pools, pooll = self._pools
# pools is a mapping object:
#
# {version -> (pool, allocated, lock)
#
# where:
#
# pool is the connection pool for the version,
# allocated is a list of all of the allocated
# connections, and
# lock is a lock that is used to block when a pool is
# empty and no more connections can be allocated.
#
# pooll is a list of all of the pools and allocated for
# use in cases where we need to iterate over all
# connections or all inactive connections.
# Pool locks are tricky. Basically, the lock needs to be
# set whenever the pool becomes empty so that threads are
# forced to wait until the pool gets a connection in it.
# The lock is acquired when the (empty) pool is
# created. The lock is acquired just prior to removing
# the last connection from the pool and released just after
# adding a connection to an empty pool.
if pools.has_key(version):
pool, allocated, pool_lock = pools[version]
else:
pool, allocated, pool_lock = pools[version] = (
[], [], allocate_lock())
pooll.append((pool, allocated))
pool_lock.acquire()
# Tell the connection it belongs to self.
result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
if not pool: # A good time to do some cache cleanup.
c = None self._connectionMap(lambda c: c.cacheGC())
if version:
if self._version_pool_size > len(allocated) or force: return result
c = self.klass(version=version,
cache_size=self._version_cache_size,
mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c)
pool.append(c)
elif self._pool_size > len(allocated) or force:
c = self.klass(version=version,
cache_size=self._cache_size,
mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
allocated.append(c)
pool.append(c)
if c is None:
if waitflag:
self._r()
pool_lock.acquire()
self._a()
if len(pool) > 1:
# Note that the pool size will normally be 1 here,
# but it could be higher due to a race condition.
pool_lock.release()
else:
return
elif len(pool)==1:
# Taking last one, lock the pool.
# Note that another thread might grab the lock
# before us, so we might actually block, however,
# when we get the lock back, there *will* be a
# connection in the pool. OTOH, there's no limit on
# how long we may need to wait: if the other thread
# grabbed the lock in this section too, we'll wait
# here until another connection is closed.
# checkConcurrentUpdates1Storage provoked this frequently
# on a hyperthreaded machine, with its second thread
# timing out after waiting 5 minutes for DB.open() to
# return. So, if we can't get the pool lock immediately,
# now we make a recursive call. This allows the current
# thread to allocate a new connection instead of waiting
# arbitrarily long for the single connection in the pool
# right now.
self._r()
if not pool_lock.acquire(0):
result = DB.open(self, version, transaction, temporary,
force, waitflag)
self._a()
return result
self._a()
if len(pool) > 1:
# Note that the pool size will normally be 1 here,
# but it could be higher due to a race condition.
pool_lock.release()
c = pool.pop()
c._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
for pool, allocated in pooll:
for cc in pool:
cc.cacheGC()
if transaction is not None:
transaction[version] = c
return c
finally: finally:
self._r() self._r()
def removeVersionPool(self, version): def removeVersionPool(self, version):
pools, pooll = self._pools try:
info = pools.get(version) del self._pools[version]
if info: except KeyError:
del pools[version] pass
pool, allocated, pool_lock = info
pooll.remove((pool, allocated))
try:
pool_lock.release()
except: # XXX Do we actually expect this to fail?
pass
del pool[:]
del allocated[:]
def connectionDebugInfo(self): def connectionDebugInfo(self):
r = [] result = []
pools, pooll = self._pools
t = time() t = time()
for version, (pool, allocated, lock) in pools.items(): def f(c):
for c in allocated: o = c._opened
o = c._opened d = c._debug_info
d = c._debug_info if d:
if d: if len(d) == 1:
if len(d)==1: d = d[0]
d = d[0] else:
else: d = ''
d='' d = "%s (%s)" % (d, len(c._cache))
d = "%s (%s)" % (d, len(c._cache))
r.append({ result.append({
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)), 'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'info': d, 'info': d,
'version': version, 'version': version,
}) })
return r
self._connectionMap(f)
return result
def getActivityMonitor(self): def getActivityMonitor(self):
return self._activity_monitor return self._activity_monitor
...@@ -623,33 +609,51 @@ class DB(object): ...@@ -623,33 +609,51 @@ class DB(object):
logger.error("packing", exc_info=True) logger.error("packing", exc_info=True)
raise raise
def setCacheSize(self, v): def setActivityMonitor(self, am):
self._cache_size = v self._activity_monitor = am
d = self._pools[0]
pool_info = d.get('')
if pool_info is not None:
for c in pool_info[1]:
c._cache.cache_size = v
def classFactory(self, connection, modulename, globalname): def classFactory(self, connection, modulename, globalname):
# Zope will rebind this method to arbitrary user code at runtime. # Zope will rebind this method to arbitrary user code at runtime.
return find_global(modulename, globalname) return find_global(modulename, globalname)
def setPoolSize(self, v): def setCacheSize(self, v):
self._pool_size = v self._a()
try:
def setActivityMonitor(self, am): self._cache_size = v
self._activity_monitor = am pool = self._pools.get('')
if pool is not None:
for c in pool.all_as_list():
c._cache.cache_size = v
finally:
self._r()
def setVersionCacheSize(self, v): def setVersionCacheSize(self, v):
self._version_cache_size = v self._a()
for ver in self._pools[0].keys(): try:
if ver: self._version_cache_size = v
for c in self._pools[0][ver][1]: for version, pool in self._pools.items():
c._cache.cache_size = v if version:
for c in pool.all_as_list():
c._cache.cache_size = v
finally:
self._r()
def setPoolSize(self, size):
self._pool_size = size
self._reset_pool_sizes(size, for_versions=False)
def setVersionPoolSize(self, v): def setVersionPoolSize(self, size):
self._version_pool_size=v self._version_pool_size = size
self._reset_pool_sizes(size, for_versions=True)
def _reset_pool_sizes(self, size, for_versions=False):
self._a()
try:
for version, pool in self._pools.items():
if (version != '') == for_versions:
pool.set_pool_size(size)
finally:
self._r()
def undo(self, id, txn=None): def undo(self, id, txn=None):
"""Undo a transaction identified by id. """Undo a transaction identified by id.
...@@ -679,23 +683,19 @@ class DB(object): ...@@ -679,23 +683,19 @@ class DB(object):
def getCacheDeactivateAfter(self): def getCacheDeactivateAfter(self):
"""Deprecated""" """Deprecated"""
warnings.warn("cache_deactivate_after has no effect", deprecated36("getCacheDeactivateAfter has no effect")
DeprecationWarning)
def getVersionCacheDeactivateAfter(self): def getVersionCacheDeactivateAfter(self):
"""Deprecated""" """Deprecated"""
warnings.warn("cache_deactivate_after has no effect", deprecated36("getVersionCacheDeactivateAfter has no effect")
DeprecationWarning)
def setCacheDeactivateAfter(self, v): def setCacheDeactivateAfter(self, v):
"""Deprecated""" """Deprecated"""
warnings.warn("cache_deactivate_after has no effect", deprecated36("setCacheDeactivateAfter has no effect")
DeprecationWarning)
def setVersionCacheDeactivateAfter(self, v): def setVersionCacheDeactivateAfter(self, v):
"""Deprecated""" """Deprecated"""
warnings.warn("cache_deactivate_after has no effect", deprecated36("setVersionCacheDeactivateAfter has no effect")
DeprecationWarning)
class ResourceManager(object): class ResourceManager(object):
"""Transaction participation for a version or undo resource.""" """Transaction participation for a version or undo resource."""
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
Here we exercise the connection management done by the DB class.
>>> from ZODB import DB
>>> from ZODB.MappingStorage import MappingStorage as Storage
Capturing log messages from DB is important for some of the examples:
>>> from zope.testing.loggingsupport import InstalledHandler
>>> handler = InstalledHandler('ZODB.DB')
Create a storage, and wrap it in a DB wrapper:
>>> st = Storage()
>>> db = DB(st)
By default, we can open 7 connections without any log messages:
>>> conns = [db.open() for dummy in range(7)]
>>> handler.records
[]
Open one more, and we get a warning:
>>> conns.append(db.open())
>>> len(handler.records)
1
>>> msg = handler.records[0]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 8 open connections with a pool_size of 7
Open 6 more, and we get 6 more warnings:
>>> conns.extend([db.open() for dummy in range(6)])
>>> len(conns)
14
>>> len(handler.records)
7
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 14 open connections with a pool_size of 7
Add another, so that it's more than twice the default, and the level
rises to critical:
>>> conns.append(db.open())
>>> len(conns)
15
>>> len(handler.records)
8
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB CRITICAL DB.open() has 15 open connections with a pool_size of 7
While it's boring, it's important to verify that the same relationships
hold if the default pool size is overridden.
>>> handler.clear()
>>> st.close()
>>> st = Storage()
>>> PS = 2 # smaller pool size
>>> db = DB(st, pool_size=PS)
>>> conns = [db.open() for dummy in range(PS)]
>>> handler.records
[]
A warning for opening one more:
>>> conns.append(db.open())
>>> len(handler.records)
1
>>> msg = handler.records[0]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 3 open connections with a pool_size of 2
More warnings through 4 connections:
>>> conns.extend([db.open() for dummy in range(PS-1)])
>>> len(conns)
4
>>> len(handler.records)
2
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 4 open connections with a pool_size of 2
And critical for going beyond that:
>>> conns.append(db.open())
>>> len(conns)
5
>>> len(handler.records)
3
>>> msg = handler.records[-1]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB CRITICAL DB.open() has 5 open connections with a pool_size of 2
We can change the pool size on the fly:
>>> handler.clear()
>>> db.setPoolSize(6)
>>> conns.append(db.open())
>>> handler.records # no log msg -- the pool is bigger now
[]
>>> conns.append(db.open()) # but one more and there's a warning again
>>> len(handler.records)
1
>>> msg = handler.records[0]
>>> print msg.name, msg.levelname, msg.getMessage()
ZODB.DB WARNING DB.open() has 7 open connections with a pool_size of 6
Enough of that.
>>> handler.clear()
>>> st.close()
More interesting is the stack-like nature of connection reuse. So long as
we keep opening new connections, and keep them alive, all connections
returned are distinct:
>>> st = Storage()
>>> db = DB(st)
>>> c1 = db.open()
>>> c2 = db.open()
>>> c3 = db.open()
>>> c1 is c2 or c1 is c3 or c2 is c3
False
Let's put some markers on the connections, so we can identify these
specific objects later:
>>> c1.MARKER = 'c1'
>>> c2.MARKER = 'c2'
>>> c3.MARKER = 'c3'
Now explicitly close c1 and c2:
>>> c1.close()
>>> c2.close()
Reaching into the internals, we can see that db's connection pool now has
two connections available for reuse, and knows about three connections in
all:
>>> pool = db._pools['']
>>> len(pool.available)
2
>>> len(pool.all)
3
Since we closed c2 last, it's at the top of the available stack, so will
be reused by the next open():
>>> c1 = db.open()
>>> c1.MARKER
'c2'
>>> len(pool.available), len(pool.all)
(1, 3)
>>> c3.close() # now the stack has c3 on top, then c1
>>> c2 = db.open()
>>> c2.MARKER
'c3'
>>> len(pool.available), len(pool.all)
(1, 3)
>>> c3 = db.open()
>>> c3.MARKER
'c1'
>>> len(pool.available), len(pool.all)
(0, 3)
What about the 3 in pool.all? We've seen that closing connections doesn't
reduce pool.all, and it would be bad if DB kept connections alive forever.
In fact pool.all is a "weak set" of connections -- it holds weak references
to connections. That alone doesn't keep connection objects alive. The
weak set allows DB's statistics methods to return info about connections
that are still alive.
>>> len(db.cacheDetailSize()) # one result for each connection's cache
3
If a connection object is abandoned (it becomes unreachable), then it
will vanish from pool.all automatically. However, connections are
involved in cycles, so exactly when a connection vanishes from pool.all
isn't predictable. It can be forced by running gc.collect():
>>> import gc
>>> dummy = gc.collect()
>>> len(pool.all)
3
>>> c3 = None
>>> dummy = gc.collect() # removes c3 from pool.all
>>> len(pool.all)
2
Note that c3 is really gone; in particular it didn't get added back to
the stack of available connections by magic:
>>> len(pool.available)
0
Nothing in that last block should have logged any msgs:
>>> handler.records
[]
If "too many" connections are open, then closing one may kick an older
closed one out of the available connection stack.
>>> st.close()
>>> st = Storage()
>>> db = DB(st, pool_size=3)
>>> conns = [db.open() for dummy in range(6)]
>>> len(handler.records) # 3 warnings for the "excess" connections
3
>>> pool = db._pools['']
>>> len(pool.available), len(pool.all)
(0, 6)
Let's mark them:
>>> for i, c in enumerate(conns):
... c.MARKER = i
Closing connections adds them to the stack:
>>> for i in range(3):
... conns[i].close()
>>> len(pool.available), len(pool.all)
(3, 6)
>>> del conns[:3] # leave the ones with MARKERs 3, 4 and 5
Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack):
>>> [c.MARKER for c in pool.available]
[0, 1, 2]
>>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all)
(3, 5)
>>> [c.MARKER for c in pool.available]
[1, 2, 3]
Similarly for the other two:
>>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all)
(3, 3)
>>> [c.MARKER for c in pool.available]
[3, 4, 5]
Reducing the pool size may also purge the oldest closed connections:
>>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all)
(2, 2)
>>> [c.MARKER for c in pool.available]
[4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the
first popped:
>>> c1 = db.open(); c2 = db.open()
>>> c1.MARKER, c2.MARKER
(5, 4)
>>> len(pool.available), len(pool.all)
(0, 2)
Clean up.
>>> st.close()
>>> handler.uninstall()
...@@ -414,8 +414,9 @@ class UserMethodTests(unittest.TestCase): ...@@ -414,8 +414,9 @@ class UserMethodTests(unittest.TestCase):
>>> len(hook.warnings) >>> len(hook.warnings)
1 1
>>> message, category, filename, lineno = hook.warnings[0] >>> message, category, filename, lineno = hook.warnings[0]
>>> message >>> print message
'The dt argument to cacheMinimize is ignored.' This will be removed in ZODB 3.6:
cacheMinimize() dt= is ignored.
>>> category.__name__ >>> category.__name__
'DeprecationWarning' 'DeprecationWarning'
>>> hook.clear() >>> hook.clear()
...@@ -434,8 +435,9 @@ class UserMethodTests(unittest.TestCase): ...@@ -434,8 +435,9 @@ class UserMethodTests(unittest.TestCase):
>>> len(hook.warnings) >>> len(hook.warnings)
2 2
>>> message, category, filename, lineno = hook.warnings[0] >>> message, category, filename, lineno = hook.warnings[0]
>>> message >>> print message
'cacheFullSweep is deprecated. Use cacheMinimize instead.' This will be removed in ZODB 3.6:
cacheFullSweep is deprecated. Use cacheMinimize instead.
>>> category.__name__ >>> category.__name__
'DeprecationWarning' 'DeprecationWarning'
>>> message, category, filename, lineno = hook.warnings[1] >>> message, category, filename, lineno = hook.warnings[1]
......
...@@ -23,6 +23,10 @@ import ZODB.FileStorage ...@@ -23,6 +23,10 @@ import ZODB.FileStorage
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
# Return total number of connections across all pools in a db._pools.
def nconn(pools):
return sum([len(pool.all) for pool in pools.values()])
class DBTests(unittest.TestCase): class DBTests(unittest.TestCase):
def setUp(self): def setUp(self):
...@@ -75,22 +79,22 @@ class DBTests(unittest.TestCase): ...@@ -75,22 +79,22 @@ class DBTests(unittest.TestCase):
c12.close() # return to pool c12.close() # return to pool
self.assert_(c1 is c12) # should be same self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools pools = self.db._pools
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1') self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2) self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2) self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1') c12 = self.db.open('v1')
c12.close() # return to pool c12.close() # return to pool
self.assert_(c1 is not c12) # should be different self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
def _test_for_leak(self): def _test_for_leak(self):
self.dowork() self.dowork()
...@@ -112,27 +116,27 @@ class DBTests(unittest.TestCase): ...@@ -112,27 +116,27 @@ class DBTests(unittest.TestCase):
c12 = self.db.open('v1') c12 = self.db.open('v1')
self.assert_(c1 is c12) # should be same self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools pools = self.db._pools
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1') self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2) self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2) self.assertEqual(nconn(pools), 2)
c12.close() # should leave pools alone c12.close() # should leave pools alone
self.assertEqual(len(pools), 2) self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2) self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1') c12 = self.db.open('v1')
c12.close() # return to pool c12.close() # return to pool
self.assert_(c1 is not c12) # should be different self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
def test_suite(): def test_suite():
......
...@@ -243,9 +243,13 @@ class ZODBTests(unittest.TestCase): ...@@ -243,9 +243,13 @@ class ZODBTests(unittest.TestCase):
self.assertEqual(r1['item'], 2) self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2) self.assertEqual(r2['item'], 2)
for msg, obj, filename, lineno in hook.warnings: for msg, obj, filename, lineno in hook.warnings:
self.assert_( self.assert_(msg in [
msg.startswith("setLocalTransaction() is deprecated.") or "This will be removed in ZODB 3.6:\n"
msg.startswith("getTransaction() is deprecated.")) "setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.",
"This will be removed in ZODB 3.6:\n"
"getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead."])
finally: finally:
conn1.close() conn1.close()
conn2.close() conn2.close()
......
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from zope.testing.doctestunit import DocFileSuite
def test_suite():
return DocFileSuite("dbopen.txt")
...@@ -18,6 +18,8 @@ from struct import pack, unpack ...@@ -18,6 +18,8 @@ from struct import pack, unpack
from binascii import hexlify from binascii import hexlify
import cPickle import cPickle
import cStringIO import cStringIO
import weakref
import warnings
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
...@@ -34,8 +36,27 @@ __all__ = ['z64', ...@@ -34,8 +36,27 @@ __all__ = ['z64',
'positive_id', 'positive_id',
'get_refs', 'get_refs',
'readable_tid_repr', 'readable_tid_repr',
'WeakSet',
'DEPRECATED_ARGUMENT',
'deprecated36',
] ]
# A unique marker to give as the default value for a deprecated argument.
# The method should then do a
#
# if that_arg is not DEPRECATED_ARGUMENT:
# complain
#
# dance.
DEPRECATED_ARGUMENT = object()
# Raise DeprecationWarning, noting that the deprecated thing will go
# away in ZODB 3.6. Point to the caller of our caller (i.e., at the
# code using the deprecated thing).
def deprecated36(msg):
warnings.warn("This will be removed in ZODB 3.6:\n%s" % msg,
DeprecationWarning, stacklevel=3)
z64 = '\0'*8 z64 = '\0'*8
# TODO The purpose of t32 is unclear. Code that uses it is usually # TODO The purpose of t32 is unclear. Code that uses it is usually
...@@ -164,3 +185,46 @@ def get_refs(pickle): ...@@ -164,3 +185,46 @@ def get_refs(pickle):
u.noload() # class info u.noload() # class info
u.noload() # instance state info u.noload() # instance state info
return refs return refs
# A simple implementation of weak sets, supplying just enough of Python's
# sets.Set interface for our needs.
class WeakSet(object):
"""A set of objects that doesn't keep its elements alive.
The objects in the set must be weakly referencable.
The objects need not be hashable, and need not support comparison.
Two objects are considered to be the same iff their id()s are equal.
When the only references to an object are weak references (including
those from WeakSets), the object can be garbage-collected, and
will vanish from any WeakSets it may be a member of at that time.
"""
def __init__(self):
# Map id(obj) to obj. By using ids as keys, we avoid requiring
# that the elements be hashable or comparable.
self.data = weakref.WeakValueDictionary()
def __len__(self):
return len(self.data)
def __contains__(self, obj):
return id(obj) in self.data
# Same as a Set, add obj to the collection.
def add(self, obj):
self.data[id(obj)] = obj
# Same as a Set, remove obj from the collection, and raise
# KeyError if obj not in the collection.
def remove(self, obj):
del self.data[id(obj)]
# Return a list of all the objects in the collection.
# Because a weak dict is used internally, iteration
# is dicey (the underlying dict may change size during
# iteration, due to gc or activity from other threads).
# as_list() attempts to be safe.
def as_list(self):
return self.data.values()
...@@ -11,44 +11,16 @@ ...@@ -11,44 +11,16 @@
# FOR A PARTICULAR PURPOSE. # FOR A PARTICULAR PURPOSE.
# #
############################################################################## ##############################################################################
import doctest
import os
import sys
import unittest
import persistent.tests
from persistent import Persistent from persistent import Persistent
from zope.testing.doctestunit import DocFileSuite
class P(Persistent): class P(Persistent):
def __init__(self): def __init__(self):
self.x = 0 self.x = 0
def inc(self): def inc(self):
self.x += 1 self.x += 1
try:
DocFileSuite = doctest.DocFileSuite # >= Python 2.4.0a2
except AttributeError:
# <= Python 2.4.0a1
def DocFileSuite(path, globs=None):
# It's not entirely obvious how to connection this single string
# with unittest. For now, re-use the _utest() function that comes
# standard with doctest in Python 2.3. One problem is that the
# error indicator doesn't point to the line of the doctest file
# that failed.
path = os.path.join(persistent.tests.__path__[0], path)
source = open(path).read()
if globs is None:
globs = sys._getframe(1).f_globals
t = doctest.Tester(globs=globs)
def runit():
doctest._utest(t, path, source, path, 0)
f = unittest.FunctionTestCase(runit,
description="doctest from %s" % path)
suite = unittest.TestSuite()
suite.addTest(f)
return suite
def test_suite(): def test_suite():
return DocFileSuite("persistent.txt", globs={"P": P}) return DocFileSuite("persistent.txt", globs={"P": P})
...@@ -18,7 +18,6 @@ are associated with the right transaction. ...@@ -18,7 +18,6 @@ are associated with the right transaction.
""" """
import thread import thread
import weakref
from transaction._transaction import Transaction from transaction._transaction import Transaction
...@@ -28,48 +27,16 @@ from transaction._transaction import Transaction ...@@ -28,48 +27,16 @@ from transaction._transaction import Transaction
# practice not to explicitly close Connection objects, and keeping # practice not to explicitly close Connection objects, and keeping
# a Connection alive keeps a potentially huge number of other objects # a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too). # alive (e.g., the cache, and everything reachable from it too).
# Therefore we use "weak sets" internally.
# #
# Therefore we use "weak sets" internally. The implementation here # Obscure: because of the __init__.py maze, we can't import WeakSet
# implements just enough of Python's sets.Set interface for our needs. # at top level here.
class WeakSet(object):
"""A set of objects that doesn't keep its elements alive.
The objects in the set must be weakly referencable.
The objects need not be hashable, and need not support comparison.
Two objects are considered to be the same iff their id()s are equal.
When the only references to an object are weak references (including
those from WeakSets), the object can be garbage-collected, and
will vanish from any WeakSets it may be a member of at that time.
"""
def __init__(self):
# Map id(obj) to obj. By using ids as keys, we avoid requiring
# that the elements be hashable or comparable.
self.data = weakref.WeakValueDictionary()
# Same as a Set, add obj to the collection.
def add(self, obj):
self.data[id(obj)] = obj
# Same as a Set, remove obj from the collection, and raise
# KeyError if obj not in the collection.
def remove(self, obj):
del self.data[id(obj)]
# Return a list of all the objects in the collection.
# Because a weak dict is used internally, iteration
# is dicey (the underlying dict may change size during
# iteration, due to gc or activity from other threads).
# as_list() attempts to be safe.
def as_list(self):
return self.data.values()
class TransactionManager(object): class TransactionManager(object):
def __init__(self): def __init__(self):
from ZODB.utils import WeakSet
self._txn = None self._txn = None
self._synchs = WeakSet() self._synchs = WeakSet()
...@@ -135,6 +102,8 @@ class ThreadTransactionManager(object): ...@@ -135,6 +102,8 @@ class ThreadTransactionManager(object):
del self._txns[tid] del self._txns[tid]
def registerSynch(self, synch): def registerSynch(self, synch):
from ZODB.utils import WeakSet
tid = thread.get_ident() tid = thread.get_ident()
ws = self._synchs.get(tid) ws = self._synchs.get(tid)
if ws is None: if ws is None:
......
...@@ -261,9 +261,10 @@ class Transaction(object): ...@@ -261,9 +261,10 @@ class Transaction(object):
self._resources.append(adapter) self._resources.append(adapter)
def begin(self): def begin(self):
warnings.warn("Transaction.begin() should no longer be used; use " from ZODB.utils import deprecated36
"the begin() method of a transaction manager.",
DeprecationWarning, stacklevel=2) deprecated36("Transaction.begin() should no longer be used; use "
"the begin() method of a transaction manager.")
if (self._resources or if (self._resources or
self._sub or self._sub or
self._nonsub or self._nonsub or
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment