Commit beacdb4e authored by Tim Peters's avatar Tim Peters

Rewrote DB to use new _ConnectionPool class.

Most relevant code is much simpler now; there's no longer
a hard limit on the # of connections per DB.  All the
tests pass, but test coverage of the sprawling DB API isn't
very good.
parent 573449f9
...@@ -50,11 +50,11 @@ class _ConnectionPool(object): ...@@ -50,11 +50,11 @@ class _ConnectionPool(object):
When a connection is explicitly closed, tell the pool via repush(). When a connection is explicitly closed, tell the pool via repush().
That adds the connection to a stack of connections available for That adds the connection to a stack of connections available for
reuse, and throws away the oldest stack entries if the pool is too large. reuse, and throws away the oldest stack entries if the pool is too large.
get() pops this stack. pop() pops this stack.
If a connection is obtained via get(), the pool holds only a weak When a connection is obtained via pop(), the pool holds only a weak
reference to it thereafter. It's not necessary to inform the pool reference to it thereafter. It's not necessary to inform the pool
if the connection goes away. A connection handed out by get() counts if the connection goes away. A connection handed out by pop() counts
against pool_size only so long as it exists, and provided it isn't against pool_size only so long as it exists, and provided it isn't
repush()'ed. repush()'ed.
""" """
...@@ -94,26 +94,37 @@ class _ConnectionPool(object): ...@@ -94,26 +94,37 @@ class _ConnectionPool(object):
reporter("DB.open() has %s open connections with a pool_size " reporter("DB.open() has %s open connections with a pool_size "
"of %s", n, limit) "of %s", n, limit)
# Reregister an available connection formerly obtained via get(). # Reregister an available connection formerly obtained via pop().
def repush(self, c): def repush(self, c):
assert c in self.all assert c in self.all
assert c not in self.available assert c not in self.available
self._reduce_size() self._reduce_size()
self.available.append(c) self.available.append(c)
# Prior to pushing a connection onto self.available, throw away the # Throw away the oldest available connections until we're under our
# oldest available connections until we're under our target size. # target size. It may not be possible be achieve this.
# It may not be possible be achieve this.
def _reduce_size(self): def _reduce_size(self):
while self.available and len(self.all) >= self.pool_size: while self.available and len(self.all) >= self.pool_size:
c = self.available.pop(0) c = self.available.pop(0)
self.all.remove(c) self.all.remove(c)
def get(self): # The number of available connections.
def num_available(self):
return len(self.available)
# Pop an available connection and return it. A caller must ensurue
# that num_available() > 0 before calling pop(), and if it's not,
# create a connection and register it via push() first.
def pop(self):
# Leave it in self.all, so we can still get at it for statistics # Leave it in self.all, so we can still get at it for statistics
# while it's alive. # while it's alive.
return self.available.pop() c = self.available.pop()
assert c in self.all
return c
# Return a list of all connections we currently know about.
def all_as_list(self):
return self.all.as_list()
class DB(object): class DB(object):
...@@ -178,11 +189,11 @@ class DB(object): ...@@ -178,11 +189,11 @@ class DB(object):
- `storage`: the storage used by the database, e.g. FileStorage - `storage`: the storage used by the database, e.g. FileStorage
- `pool_size`: expected maximum number of open connections - `pool_size`: expected maximum number of open connections
- `cache_size`: target size of Connection object cache - `cache_size`: target size of Connection object cache
- `cache_deactivate_after`: ignored
- `version_pool_size`: expected maximum number of connections (per - `version_pool_size`: expected maximum number of connections (per
version) version)
- `version_cache_size`: target size of Connection object cache for - `version_cache_size`: target size of Connection object cache for
version connections version connections
- `cache_deactivate_after`: ignored
- `version_cache_deactivate_after`: ignored - `version_cache_deactivate_after`: ignored
""" """
# Allocate locks: # Allocate locks:
...@@ -191,8 +202,8 @@ class DB(object): ...@@ -191,8 +202,8 @@ class DB(object):
self._r = l.release self._r = l.release
# Setup connection pools and cache info # Setup connection pools and cache info
self._pools = {},[] # _pools maps a version string to a _ConnectionPool object.
self._temps = [] self._pools = {}
self._pool_size = pool_size self._pool_size = pool_size
self._cache_size = cache_size self._cache_size = cache_size
self._version_pool_size = version_pool_size self._version_pool_size = version_pool_size
...@@ -252,10 +263,10 @@ class DB(object): ...@@ -252,10 +263,10 @@ class DB(object):
am = self._activity_monitor am = self._activity_monitor
if am is not None: if am is not None:
am.closedConnection(connection) am.closedConnection(connection)
version = connection._version version = connection._version
pools, pooll = self._pools
try: try:
pool, allocated, pool_lock = pools[version] pool = self._pools[version]
except KeyError: except KeyError:
# No such version. We must have deleted the pool. # No such version. We must have deleted the pool.
# Just let the connection go. # Just let the connection go.
...@@ -264,30 +275,18 @@ class DB(object): ...@@ -264,30 +275,18 @@ class DB(object):
# XXX What objects are involved in the cycle? # XXX What objects are involved in the cycle?
connection.__dict__.clear() connection.__dict__.clear()
return return
pool.repush(connection)
pool.append(connection)
if len(pool) == 1:
# Pool now usable again, unlock it.
pool_lock.release()
finally: finally:
self._r() self._r()
# Call f(c) for all connections in all pools in all versions.
def _connectionMap(self, f): def _connectionMap(self, f):
self._a() self._a()
try: try:
pools, pooll = self._pools for pool in self._pools.itervalues():
for pool, allocated in pooll: for c in pool.all_as_list():
for cc in allocated: f(c)
f(cc)
temps = self._temps
if temps:
t = []
rc = sys.getrefcount
for cc in temps:
if rc(cc) > 3:
f(cc)
self._temps = t
finally: finally:
self._r() self._r()
...@@ -445,7 +444,6 @@ class DB(object): ...@@ -445,7 +444,6 @@ class DB(object):
if connection is not None: if connection is not None:
version = connection._version version = connection._version
# Update modified in version cache # Update modified in version cache
# XXX must make this work with list or dict to backport to 2.6
for oid in oids.keys(): for oid in oids.keys():
h = hash(oid) % 131 h = hash(oid) % 131
o = self._miv_cache.get(h, None) o = self._miv_cache.get(h, None)
...@@ -453,25 +451,12 @@ class DB(object): ...@@ -453,25 +451,12 @@ class DB(object):
del self._miv_cache[h] del self._miv_cache[h]
# Notify connections # Notify connections
for pool, allocated in self._pools[1]: for pool in self._pools.values():
for cc in allocated: for cc in pool.all_as_list():
if (cc is not connection and
(not version or cc._version==version)):
if sys.getrefcount(cc) <= 3:
cc.close()
cc.invalidate(tid, oids)
if self._temps:
t = []
for cc in self._temps:
if sys.getrefcount(cc) > 3:
if (cc is not connection and if (cc is not connection and
(not version or cc._version == version)): (not version or cc._version == version)):
cc.invalidate(tid, oids) cc.invalidate(tid, oids)
t.append(cc)
else:
cc.close()
self._temps = t
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
h = hash(oid) % 131 h = hash(oid) % 131
...@@ -495,11 +480,6 @@ class DB(object): ...@@ -495,11 +480,6 @@ class DB(object):
The optional `version` argument can be used to specify that a The optional `version` argument can be used to specify that a
version connection is desired. version connection is desired.
The optional transaction argument can be provided to cause the
connection to be automatically closed when a transaction is
terminated. In addition, connections per transaction are
reused, if possible.
Note that the connection pool is managed as a stack, to Note that the connection pool is managed as a stack, to
increase the likelihood that the connection's stack will increase the likelihood that the connection's stack will
include useful objects. include useful objects.
...@@ -532,142 +512,66 @@ class DB(object): ...@@ -532,142 +512,66 @@ class DB(object):
self._a() self._a()
try: try:
pools, pooll = self._pools # pool <- the _ConnectionPool for this version
pool = self._pools.get(version)
# pools is a mapping object: if pool is None:
# if version:
# {version -> (pool, allocated, lock) size = self._version_pool_size
#
# where:
#
# pool is the connection pool for the version,
# allocated is a list of all of the allocated
# connections, and
# lock is a lock that is used to block when a pool is
# empty and no more connections can be allocated.
#
# pooll is a list of all of the pools and allocated for
# use in cases where we need to iterate over all
# connections or all inactive connections.
# Pool locks are tricky. Basically, the lock needs to be
# set whenever the pool becomes empty so that threads are
# forced to wait until the pool gets a connection in it.
# The lock is acquired when the (empty) pool is
# created. The lock is acquired just prior to removing
# the last connection from the pool and released just after
# adding a connection to an empty pool.
if pools.has_key(version):
pool, allocated, pool_lock = pools[version]
else: else:
pool, allocated, pool_lock = pools[version] = ( size = self._pool_size
[], [], allocate_lock()) self._pools[version] = pool = _ConnectionPool(size)
pooll.append((pool, allocated))
pool_lock.acquire()
if not pool: # result <- a connection
c = None if pool.num_available() == 0:
if version: if version:
if self._version_pool_size > len(allocated): cache = self._version_cache_size
c = self.klass(version=version, else:
cache_size=self._version_cache_size, cache = self._cache_size
c = self.klass(version=version, cache_size=cache,
mvcc=mvcc, txn_mgr=txn_mgr) mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c) pool.push(c)
pool.append(c) result = pool.pop()
elif self._pool_size > len(allocated):
c = self.klass(version=version,
cache_size=self._cache_size,
mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
allocated.append(c)
pool.append(c)
if c is None:
self._r()
pool_lock.acquire()
self._a()
if len(pool) > 1:
# Note that the pool size will normally be 1 here,
# but it could be higher due to a race condition.
pool_lock.release()
elif len(pool)==1:
# Taking last one, lock the pool.
# Note that another thread might grab the lock
# before us, so we might actually block, however,
# when we get the lock back, there *will* be a
# connection in the pool. OTOH, there's no limit on
# how long we may need to wait: if the other thread
# grabbed the lock in this section too, we'll wait
# here until another connection is closed.
# checkConcurrentUpdates1Storage provoked this frequently
# on a hyperthreaded machine, with its second thread
# timing out after waiting 5 minutes for DB.open() to
# return. So, if we can't get the pool lock immediately,
# now we make a recursive call. This allows the current
# thread to allocate a new connection instead of waiting
# arbitrarily long for the single connection in the pool
# right now.
self._r()
if not pool_lock.acquire(0):
result = DB.open(self, version, transaction, temporary,
force, waitflag)
self._a()
return result
self._a()
if len(pool) > 1:
# Note that the pool size will normally be 1 here,
# but it could be higher due to a race condition.
pool_lock.release()
c = pool.pop() # Tell the connection it belongs to self.
c._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch) result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
for pool, allocated in pooll:
for cc in pool:
cc.cacheGC()
return c # A good time to do some cache cleanup.
for pool in self._pools.itervalues():
for c in pool.all_as_list():
c.cacheGC()
return result
finally: finally:
self._r() self._r()
def removeVersionPool(self, version): def removeVersionPool(self, version):
pools, pooll = self._pools
info = pools.get(version)
if info:
del pools[version]
pool, allocated, pool_lock = info
pooll.remove((pool, allocated))
try: try:
pool_lock.release() del self._pools[version]
except: # XXX Do we actually expect this to fail? except KeyError:
pass pass
del pool[:]
del allocated[:]
def connectionDebugInfo(self): def connectionDebugInfo(self):
r = [] result = []
pools, pooll = self._pools
t = time() t = time()
for version, (pool, allocated, lock) in pools.items(): for version, pool in self._pools.items():
for c in allocated: for c in pool.all_as_list():
o = c._opened o = c._opened
d = c._debug_info d = c._debug_info
if d: if d:
if len(d)==1: if len(d) == 1:
d = d[0] d = d[0]
else: else:
d='' d = ''
d = "%s (%s)" % (d, len(c._cache)) d = "%s (%s)" % (d, len(c._cache))
r.append({ result.append({
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)), 'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'info': d, 'info': d,
'version': version, 'version': version,
}) })
return r
return result
def getActivityMonitor(self): def getActivityMonitor(self):
return self._activity_monitor return self._activity_monitor
...@@ -696,33 +600,44 @@ class DB(object): ...@@ -696,33 +600,44 @@ class DB(object):
logger.error("packing", exc_info=True) logger.error("packing", exc_info=True)
raise raise
def setCacheSize(self, v): def setActivityMonitor(self, am):
self._cache_size = v self._activity_monitor = am
d = self._pools[0]
pool_info = d.get('')
if pool_info is not None:
for c in pool_info[1]:
c._cache.cache_size = v
def classFactory(self, connection, modulename, globalname): def classFactory(self, connection, modulename, globalname):
# Zope will rebind this method to arbitrary user code at runtime. # Zope will rebind this method to arbitrary user code at runtime.
return find_global(modulename, globalname) return find_global(modulename, globalname)
def setPoolSize(self, v): def setCacheSize(self, v):
self._pool_size = v self._cache_size = v
pool = self._pools.get('')
def setActivityMonitor(self, am): if pool is not None:
self._activity_monitor = am for c in pool.all_as_list():
c._cache.cache_size = v
def setVersionCacheSize(self, v): def setVersionCacheSize(self, v):
self._version_cache_size = v self._version_cache_size = v
for ver in self._pools[0].keys(): for version, pool in self._pools.items():
if ver: if version:
for c in self._pools[0][ver][1]: for c in pool.all_as_list():
c._cache.cache_size = v c._cache.cache_size = v
def setVersionPoolSize(self, v): def setPoolSize(self, size):
self._version_pool_size=v self._pool_size = size
self._reset_pool_sizes(size, for_versions=False)
def setVersionPoolSize(self, size):
self._version_pool_size = size
self._reset_pool_sizes(size, for_versions=True)
def _reset_pool_sizes(self, size, for_versions=False):
self._a()
try:
for version, pool in self._pools.items():
if (version != '') == for_versions:
pool.set_pool_size(size)
finally:
self._r()
def undo(self, id, txn=None): def undo(self, id, txn=None):
"""Undo a transaction identified by id. """Undo a transaction identified by id.
......
...@@ -23,6 +23,10 @@ import ZODB.FileStorage ...@@ -23,6 +23,10 @@ import ZODB.FileStorage
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
# Return total number of connections across all pools in a db._pools.
def nconn(pools):
return sum([len(pool.all) for pool in pools.values()])
class DBTests(unittest.TestCase): class DBTests(unittest.TestCase):
def setUp(self): def setUp(self):
...@@ -75,22 +79,22 @@ class DBTests(unittest.TestCase): ...@@ -75,22 +79,22 @@ class DBTests(unittest.TestCase):
c12.close() # return to pool c12.close() # return to pool
self.assert_(c1 is c12) # should be same self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools pools = self.db._pools
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1') self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2) self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2) self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1') c12 = self.db.open('v1')
c12.close() # return to pool c12.close() # return to pool
self.assert_(c1 is not c12) # should be different self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
def _test_for_leak(self): def _test_for_leak(self):
self.dowork() self.dowork()
...@@ -112,27 +116,27 @@ class DBTests(unittest.TestCase): ...@@ -112,27 +116,27 @@ class DBTests(unittest.TestCase):
c12 = self.db.open('v1') c12 = self.db.open('v1')
self.assert_(c1 is c12) # should be same self.assert_(c1 is c12) # should be same
pools, pooll = self.db._pools pools = self.db._pools
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
self.db.removeVersionPool('v1') self.db.removeVersionPool('v1')
self.assertEqual(len(pools), 2) self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2) self.assertEqual(nconn(pools), 2)
c12.close() # should leave pools alone c12.close() # should leave pools alone
self.assertEqual(len(pools), 2) self.assertEqual(len(pools), 2)
self.assertEqual(len(pooll), 2) self.assertEqual(nconn(pools), 2)
c12 = self.db.open('v1') c12 = self.db.open('v1')
c12.close() # return to pool c12.close() # return to pool
self.assert_(c1 is not c12) # should be different self.assert_(c1 is not c12) # should be different
self.assertEqual(len(pools), 3) self.assertEqual(len(pools), 3)
self.assertEqual(len(pooll), 3) self.assertEqual(nconn(pools), 3)
def test_suite(): def test_suite():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment