Commit 847369b1 authored by Tim Peters's avatar Tim Peters

Switch DB to using a reentrant lock for its methods.

This allows more extensive use of the _connectionMap() method.
parent 1f669d95
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
$Id$""" $Id$"""
import cPickle, cStringIO, sys import cPickle, cStringIO, sys
from thread import allocate_lock import threading
from time import time, ctime from time import time, ctime
import warnings import warnings
import logging import logging
...@@ -113,7 +113,7 @@ class _ConnectionPool(object): ...@@ -113,7 +113,7 @@ class _ConnectionPool(object):
# Throw away the oldest available connections until we're under our # Throw away the oldest available connections until we're under our
# target size (strictly_less=False) or no more than that (strictly_less= # target size (strictly_less=False) or no more than that (strictly_less=
# True, the default). It may not be possible to achieve this. # True, the default).
def _reduce_size(self, strictly_less=False): def _reduce_size(self, strictly_less=False):
target = self.pool_size - bool(strictly_less) target = self.pool_size - bool(strictly_less)
while len(self.available) > target: while len(self.available) > target:
...@@ -207,10 +207,10 @@ class DB(object): ...@@ -207,10 +207,10 @@ class DB(object):
- `cache_deactivate_after`: ignored - `cache_deactivate_after`: ignored
- `version_cache_deactivate_after`: ignored - `version_cache_deactivate_after`: ignored
""" """
# Allocate locks: # Allocate lock.
l = allocate_lock() x = threading.RLock()
self._a = l.acquire self._a = x.acquire
self._r = l.release self._r = x.release
# Setup connection pools and cache info # Setup connection pools and cache info
# _pools maps a version string to a _ConnectionPool object. # _pools maps a version string to a _ConnectionPool object.
...@@ -314,12 +314,12 @@ class DB(object): ...@@ -314,12 +314,12 @@ class DB(object):
""" """
detail = {} detail = {}
def f(con, detail=detail, have_detail=detail.has_key): def f(con, detail=detail):
for oid, ob in con._cache.items(): for oid, ob in con._cache.items():
module = getattr(ob.__class__, '__module__', '') module = getattr(ob.__class__, '__module__', '')
module = module and '%s.' % module or '' module = module and '%s.' % module or ''
c = "%s%s" % (module, ob.__class__.__name__) c = "%s%s" % (module, ob.__class__.__name__)
if have_detail(c): if c in detail:
detail[c] += 1 detail[c] += 1
else: else:
detail[c] = 1 detail[c] = 1
...@@ -397,9 +397,9 @@ class DB(object): ...@@ -397,9 +397,9 @@ class DB(object):
def cacheDetailSize(self): def cacheDetailSize(self):
m = [] m = []
def f(con, m=m): def f(con, m=m):
m.append({'connection':repr(con), m.append({'connection': repr(con),
'ngsize':con._cache.cache_non_ghost_count, 'ngsize': con._cache.cache_non_ghost_count,
'size':len(con._cache)}) 'size': len(con._cache)})
self._connectionMap(f) self._connectionMap(f)
m.sort() m.sort()
return m return m
...@@ -462,19 +462,18 @@ class DB(object): ...@@ -462,19 +462,18 @@ class DB(object):
if o is not None and o[0]==oid: if o is not None and o[0]==oid:
del self._miv_cache[h] del self._miv_cache[h]
# Notify connections # Notify connections.
for pool in self._pools.values(): def inval(c):
for cc in pool.all_as_list(): if (c is not connection and
if (cc is not connection and (not version or c._version == version)):
(not version or cc._version == version)): c.invalidate(tid, oids)
cc.invalidate(tid, oids) self._connectionMap(inval)
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
h = hash(oid) % 131 h = hash(oid) % 131
cache = self._miv_cache cache = self._miv_cache
o=cache.get(h, None) o = cache.get(h, None)
if o and o[0]==oid: if o and o[0] == oid:
return o[1] return o[1]
v = self._storage.modifiedInVersion(oid) v = self._storage.modifiedInVersion(oid)
cache[h] = oid, v cache[h] = oid, v
...@@ -532,15 +531,16 @@ class DB(object): ...@@ -532,15 +531,16 @@ class DB(object):
else: else:
size = self._pool_size size = self._pool_size
self._pools[version] = pool = _ConnectionPool(size) self._pools[version] = pool = _ConnectionPool(size)
assert pool is not None
# result <- a connection # result <- a connection
result = pool.pop() result = pool.pop()
if result is None: if result is None:
if version: if version:
cache = self._version_cache_size size = self._version_cache_size
else: else:
cache = self._cache_size size = self._cache_size
c = self.klass(version=version, cache_size=cache, c = self.klass(version=version, cache_size=size,
mvcc=mvcc, txn_mgr=txn_mgr) mvcc=mvcc, txn_mgr=txn_mgr)
pool.push(c) pool.push(c)
result = pool.pop() result = pool.pop()
...@@ -550,9 +550,7 @@ class DB(object): ...@@ -550,9 +550,7 @@ class DB(object):
result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch) result._setDB(self, mvcc=mvcc, txn_mgr=txn_mgr, synch=synch)
# A good time to do some cache cleanup. # A good time to do some cache cleanup.
for pool in self._pools.itervalues(): self._connectionMap(lambda c: c.cacheGC())
for c in pool.all_as_list():
c.cacheGC()
return result return result
...@@ -568,23 +566,23 @@ class DB(object): ...@@ -568,23 +566,23 @@ class DB(object):
def connectionDebugInfo(self): def connectionDebugInfo(self):
result = [] result = []
t = time() t = time()
for version, pool in self._pools.items(): def f(c):
for c in pool.all_as_list(): o = c._opened
o = c._opened d = c._debug_info
d = c._debug_info if d:
if d: if len(d) == 1:
if len(d) == 1: d = d[0]
d = d[0] else:
else: d = ''
d = '' d = "%s (%s)" % (d, len(c._cache))
d = "%s (%s)" % (d, len(c._cache))
result.append({
result.append({ 'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)),
'opened': o and ("%s (%.2fs)" % (ctime(o), t-o)), 'info': d,
'info': d, 'version': version,
'version': version, })
})
self._connectionMap(f)
return result return result
def getActivityMonitor(self): def getActivityMonitor(self):
...@@ -622,18 +620,26 @@ class DB(object): ...@@ -622,18 +620,26 @@ class DB(object):
return find_global(modulename, globalname) return find_global(modulename, globalname)
def setCacheSize(self, v): def setCacheSize(self, v):
self._cache_size = v self._a()
pool = self._pools.get('') try:
if pool is not None: self._cache_size = v
for c in pool.all_as_list(): pool = self._pools.get('')
c._cache.cache_size = v if pool is not None:
def setVersionCacheSize(self, v):
self._version_cache_size = v
for version, pool in self._pools.items():
if version:
for c in pool.all_as_list(): for c in pool.all_as_list():
c._cache.cache_size = v c._cache.cache_size = v
finally:
self._r()
def setVersionCacheSize(self, v):
self._a()
try:
self._version_cache_size = v
for version, pool in self._pools.items():
if version:
for c in pool.all_as_list():
c._cache.cache_size = v
finally:
self._r()
def setPoolSize(self, size): def setPoolSize(self, size):
self._pool_size = size self._pool_size = size
...@@ -649,7 +655,6 @@ class DB(object): ...@@ -649,7 +655,6 @@ class DB(object):
for version, pool in self._pools.items(): for version, pool in self._pools.items():
if (version != '') == for_versions: if (version != '') == for_versions:
pool.set_pool_size(size) pool.set_pool_size(size)
finally: finally:
self._r() self._r()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment