Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Z
ZODB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Nicolas Wavrant
ZODB
Commits
1a896e9c
Commit
1a896e9c
authored
Oct 30, 2008
by
Jim Fulton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Refactored the connection pool implementation. (I have a feeling that
it could be made simpler still.)
parent
6c88dc5c
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
129 additions
and
283 deletions
+129
-283
src/ZODB/DB.py
src/ZODB/DB.py
+83
-120
src/ZODB/historical_connections.txt
src/ZODB/historical_connections.txt
+42
-156
src/ZODB/tests/dbopen.txt
src/ZODB/tests/dbopen.txt
+4
-4
src/ZODB/tests/testhistoricalconnections.py
src/ZODB/tests/testhistoricalconnections.py
+0
-3
No files found.
src/ZODB/DB.py
View file @
1a896e9c
...
...
@@ -103,22 +103,21 @@ class AbstractConnectionPool(object):
def
getTimeout
(
self
):
return
self
.
_timeout
timeout
=
property
(
getTimeout
,
setTimeout
)
timeout
=
property
(
getTimeout
,
lambda
self
,
v
:
self
.
setTimeout
(
v
)
)
size
=
property
(
getSize
,
setSize
)
size
=
property
(
getSize
,
lambda
self
,
v
:
self
.
setSize
(
v
)
)
class
ConnectionPool
(
AbstractConnectionPool
):
def
__init__
(
self
,
size
,
timeout
=
None
):
def
__init__
(
self
,
size
,
timeout
=
time
()
):
super
(
ConnectionPool
,
self
).
__init__
(
size
,
timeout
)
# A stack of connections available to hand out. This is a subset
# of self.all. push() and repush() add to this, and may remove
# the oldest available connections if the pool is too large.
# pop() pops this stack. There are never more than size entries
# in this stack. The keys are time.time() values of the push or
# repush calls.
self
.
available
=
BTrees
.
OOBTree
.
Bucket
()
# in this stack.
self
.
available
=
[]
def
push
(
self
,
c
):
"""Register a new available connection.
...
...
@@ -127,10 +126,10 @@ class ConnectionPool(AbstractConnectionPool):
stack even if we're over the pool size limit.
"""
assert
c
not
in
self
.
all
assert
c
not
in
self
.
available
.
values
()
assert
c
not
in
self
.
available
self
.
_reduce_size
(
strictly_less
=
True
)
self
.
all
.
add
(
c
)
self
.
available
[
time
()]
=
c
self
.
available
.
append
((
time
(),
c
))
n
=
len
(
self
.
all
)
limit
=
self
.
size
if
n
>
limit
:
...
...
@@ -147,43 +146,42 @@ class ConnectionPool(AbstractConnectionPool):
older available connections.
"""
assert
c
in
self
.
all
assert
c
not
in
self
.
available
.
values
()
assert
c
not
in
self
.
available
self
.
_reduce_size
(
strictly_less
=
True
)
self
.
available
[
time
()]
=
c
self
.
available
.
append
((
time
(),
c
))
def
_reduce_size
(
self
,
strictly_less
=
False
):
"""Throw away the oldest available connections until we're under our
target size (strictly_less=False, the default) or no more than that
(strictly_less=True).
"""
if
self
.
timeout
is
None
:
threshhold
=
None
else
:
threshhold
=
time
()
-
self
.
timeout
threshhold
=
time
()
-
self
.
timeout
target
=
self
.
size
if
strictly_less
:
target
-=
1
for
t
,
c
in
list
(
self
.
available
.
items
()):
if
(
len
(
self
.
available
)
>
target
or
threshhold
is
not
None
and
t
<
threshhold
):
del
self
.
available
[
t
]
self
.
all
.
remove
(
c
)
# While application code may still hold a reference to `c`,
# there's little useful that can be done with this Connection
# anymore. Its cache may be holding on to limited resources,
# and we replace the cache with an empty one now so that we
# don't have to wait for gc to reclaim it. Note that it's not
# possible for DB.open() to return `c` again: `c` can never be
# in an open state again.
# TODO: Perhaps it would be better to break the reference
# cycles between `c` and `c._cache`, so that refcounting
# reclaims both right now. But if user code _does_ have a
# strong reference to `c` now, breaking the cycle would not
# reclaim `c` now, and `c` would be left in a user-visible
# crazy state.
c
.
_resetCache
()
else
:
break
available
=
self
.
available
while
(
(
len
(
available
)
>
target
)
or
(
available
and
available
[
0
][
0
]
<
threshhold
)
):
t
,
c
=
available
.
pop
(
0
)
self
.
all
.
remove
(
c
)
# While application code may still hold a reference to `c`,
# there's little useful that can be done with this Connection
# anymore. Its cache may be holding on to limited resources,
# and we replace the cache with an empty one now so that we
# don't have to wait for gc to reclaim it. Note that it's not
# possible for DB.open() to return `c` again: `c` can never be
# in an open state again.
# TODO: Perhaps it would be better to break the reference
# cycles between `c` and `c._cache`, so that refcounting
# reclaims both right now. But if user code _does_ have a
# strong reference to `c` now, breaking the cycle would not
# reclaim `c` now, and `c` would be left in a user-visible
# crazy state.
c
.
_resetCache
()
def
reduce_size
(
self
):
self
.
_reduce_size
()
...
...
@@ -197,7 +195,7 @@ class ConnectionPool(AbstractConnectionPool):
"""
result
=
None
if
self
.
available
:
result
=
self
.
available
.
pop
(
self
.
available
.
maxKey
()
)
_
,
result
=
self
.
available
.
pop
(
)
# Leave it in self.all, so we can still get at it for statistics
# while it's alive.
assert
result
in
self
.
all
...
...
@@ -212,19 +210,15 @@ class ConnectionPool(AbstractConnectionPool):
If a connection is no longer viable because it has timed out, it is
garbage collected."""
if
self
.
timeout
is
None
:
threshhold
=
None
else
:
threshhold
=
time
()
-
self
.
timeout
for
t
,
c
in
tuple
(
self
.
available
.
items
()):
if
threshhold
is
not
None
and
t
<
threshhold
:
threshhold
=
time
()
-
self
.
timeout
for
t
,
c
in
list
(
self
.
available
):
if
t
<
threshhold
:
del
self
.
available
[
t
]
self
.
all
.
remove
(
c
)
c
.
_resetCache
()
else
:
c
.
cacheGC
()
class
KeyedConnectionPool
(
AbstractConnectionPool
):
# this pool keeps track of keyed connections all together. It makes
# it possible to make assertions about total numbers of keyed connections.
...
...
@@ -233,99 +227,68 @@ class KeyedConnectionPool(AbstractConnectionPool):
# see the comments in ConnectionPool for method descriptions.
def
__init__
(
self
,
size
,
timeout
=
None
):
def
__init__
(
self
,
size
,
timeout
=
time
()
):
super
(
KeyedConnectionPool
,
self
).
__init__
(
size
,
timeout
)
# key: {time.time: connection}
self
.
available
=
BTrees
.
family32
.
OO
.
Bucket
()
# time.time: key
self
.
closed
=
BTrees
.
family32
.
OO
.
Bucket
()
self
.
pools
=
{}
def
setSize
(
self
,
v
):
self
.
_size
=
v
for
pool
in
self
.
pools
.
values
():
pool
.
setSize
(
v
)
def
setTimeout
(
self
,
v
):
self
.
_timeout
=
v
for
pool
in
self
.
pools
.
values
():
pool
.
setTimeout
(
v
)
def
push
(
self
,
c
,
key
):
assert
c
not
in
self
.
all
available
=
self
.
available
.
get
(
key
)
if
available
is
None
:
available
=
self
.
available
[
key
]
=
BTrees
.
family32
.
OO
.
Bucket
()
else
:
assert
c
not
in
available
.
values
()
self
.
_reduce_size
(
strictly_less
=
True
)
self
.
all
.
add
(
c
)
t
=
time
()
available
[
t
]
=
c
self
.
closed
[
t
]
=
key
n
=
len
(
self
.
all
)
limit
=
self
.
size
if
n
>
limit
:
reporter
=
logger
.
warn
if
n
>
2
*
limit
:
reporter
=
logger
.
critical
reporter
(
"DB.open() has %s open connections with a size "
"of %s"
,
n
,
limit
)
pool
=
self
.
pools
.
get
(
key
)
if
pool
is
None
:
pool
=
self
.
pools
[
key
]
=
ConnectionPool
(
self
.
size
,
self
.
timeout
)
pool
.
push
(
c
)
def
repush
(
self
,
c
,
key
):
assert
c
in
self
.
all
self
.
_reduce_size
(
strictly_less
=
True
)
available
=
self
.
available
.
get
(
key
)
if
available
is
None
:
available
=
self
.
available
[
key
]
=
BTrees
.
family32
.
OO
.
Bucket
()
else
:
assert
c
not
in
available
.
values
()
t
=
time
()
available
[
t
]
=
c
self
.
closed
[
t
]
=
key
self
.
pools
[
key
].
repush
(
c
)
def
_reduce_size
(
self
,
strictly_less
=
False
):
if
self
.
timeout
is
None
:
threshhold
=
None
else
:
threshhold
=
time
()
-
self
.
timeout
target
=
self
.
size
if
strictly_less
:
target
-=
1
for
t
,
key
in
tuple
(
self
.
closed
.
items
()):
if
(
len
(
self
.
available
)
>
target
or
threshhold
is
not
None
and
t
<
threshhold
):
del
self
.
closed
[
t
]
c
=
self
.
available
[
key
].
pop
(
t
)
if
not
self
.
available
[
key
]:
del
self
.
available
[
key
]
self
.
all
.
remove
(
c
)
c
.
_resetCache
()
else
:
break
for
key
,
pool
in
list
(
self
.
pools
.
items
()):
pool
.
_reduce_size
(
strictly_less
)
if
not
pool
.
all
:
del
self
.
pools
[
key
]
def
reduce_size
(
self
):
self
.
_reduce_size
()
def
pop
(
self
,
key
):
result
=
None
available
=
self
.
available
.
get
(
key
)
if
available
:
t
=
available
.
maxKey
()
result
=
available
.
pop
(
t
)
del
self
.
closed
[
t
]
if
not
available
:
del
self
.
available
[
key
]
assert
result
in
self
.
all
return
result
pool
=
self
.
pools
.
get
(
key
)
if
pool
is
not
None
:
return
pool
.
pop
()
def
map
(
self
,
f
):
self
.
all
.
map
(
f
)
for
pool
in
self
.
pools
.
itervalues
():
pool
.
map
(
f
)
def
availableGC
(
self
):
if
self
.
timeout
is
None
:
threshhold
=
None
else
:
threshhold
=
time
()
-
self
.
timeout
for
t
,
key
in
tuple
(
self
.
closed
.
items
()):
if
threshhold
is
not
None
and
t
<
threshhold
:
del
self
.
closed
[
t
]
c
=
self
.
available
[
key
].
pop
(
t
)
if
not
self
.
available
[
key
]:
del
self
.
available
[
key
]
self
.
all
.
remove
(
c
)
c
.
_resetCache
()
else
:
self
.
available
[
key
][
t
].
cacheGC
()
for
key
,
pool
in
self
.
pools
.
items
():
pool
.
availableGC
()
if
not
pool
.
all
:
del
self
.
pools
[
key
]
@
property
def
test_all
(
self
):
result
=
set
()
for
pool
in
self
.
pools
.
itervalues
():
result
.
update
(
pool
.
all
)
return
frozenset
(
result
)
@
property
def
test_available
(
self
):
result
=
[]
for
pool
in
self
.
pools
.
itervalues
():
result
.
extend
(
pool
.
available
)
return
tuple
(
result
)
def
toTimeStamp
(
dt
):
utc_struct
=
dt
.
utctimetuple
()
...
...
src/ZODB/historical_connections.txt
View file @
1a896e9c
...
...
@@ -13,11 +13,7 @@ development continues on a "development" head.
A database can be opened historically ``at`` or ``before`` a given transaction
serial or datetime. Here's a simple example. It should work with any storage
that supports ``loadBefore``. Unfortunately that does not include
MappingStorage, so we use a FileStorage instance. Also unfortunately, as of
this writing there is no reliable way to determine if a storage truly
implements loadBefore, or if it simply returns None (as in BaseStorage), other
than reading code.
that supports ``loadBefore``.
We'll begin our example with a fairly standard set up. We
...
...
@@ -28,11 +24,8 @@ We'll begin our example with a fairly standard set up. We
- modify the database again; and
- commit a transaction.
>>> import ZODB.FileStorage
>>> storage = ZODB.FileStorage.FileStorage(
... 'HistoricalConnectionTests.fs', create=True)
>>> import ZODB
>>> db = ZODB.DB(storage)
>>> import ZODB.MappingStorage
>>> db = ZODB.MappingStorage.DB()
>>> conn = db.open()
>>> import persistent.mapping
...
...
@@ -42,14 +35,13 @@ We'll begin our example with a fairly standard set up. We
>>> import transaction
>>> transaction.commit()
We wait for some t
time to pass
, and then make some other changes.
We wait for some t
ime to pass, record he time
, and then make some other changes.
>>> import time
>>> t = time.time()
>>> while time.time() <= t:
... time.sleep(.001)
>>> import datetime
>>> now = datetime.datetime.utcnow()
...
...
@@ -164,186 +156,80 @@ historical connection should be kept.
>>> db.getHistoricalTimeout()
400
All three of these values can be specified in a ZConfig file. We're using
mapping storage for simplicity, but remember, as we said at the start of this
document, mapping storage will not work for historical connections (and in fact
may seem to work but then fail confusingly) because it does not implement
loadBefore.
All three of these values can be specified in a ZConfig file.
>>> import ZODB.config
>>> db2 = ZODB.config.databaseFromString('''
... <zodb>
... <mappingstorage/>
... historical-pool-size
5
... historical-pool-size
3
... historical-cache-size 1500
... historical-timeout 6m
... </zodb>
... ''')
>>> db2.getHistoricalPoolSize()
5
3
>>> db2.getHistoricalCacheSize()
1500
>>> db2.getHistoricalTimeout()
360
Let's actually look at these values at work by shining some light into what
has been a black box up to now. We'll actually do some white box examination
of what is going on in the database, pools and connections.
Historical connections are held in a single connection pool with mappings
from the ``before`` TID to available connections. First we'll put a new
pool on the database so we have a clean slate.
>>> historical_conn.close()
>>> from ZODB.DB import KeyedConnectionPool
>>> db.historical_pool = KeyedConnectionPool(
... db.historical_pool.size, db.historical_pool.timeout)
Now lets look what happens to the pool when we create and close an historical
connection
.
The pool lets us reuse connections. To see this, we'll open some
connection
s, close them, and then open them again:
>>> pool = db.historical_pool
>>> len(pool.all)
0
>>> len(pool.available)
0
>>> historical_conn = db.open(
... transaction_manager=transaction1, before=serial)
>>> len(pool.all)
1
>>> len(pool.available)
0
>>> historical_conn in pool.all
True
>>> historical_conn.close()
>>> len(pool.all)
1
>>> len(pool.available)
1
>>> pool.available.keys()[0] == serial
True
>>> len(pool.available.values()[0])
1
>>> conns1 = [db2.open(before=serial) for i in range(4)]
>>> _ = [c.close() for c in conns1]
>>> conns2 = [db2.open(before=serial) for i in range(4)]
Now
we'll open and close two for the same serial to see what happens to
the
data structures
.
Now
let's look at what we got. The first connection in conns 2 is
the
last connection in conns1, because it was the last connection closed
.
>>> historical_conn is db.open(
... transaction_manager=transaction1, before=serial)
>>> conns2[0] is conns1[-1]
True
>>> len(pool.all)
1
>>> len(pool.available)
0
>>> transaction2 = transaction.TransactionManager()
>>> historical_conn2 = db.open(
... transaction_manager=transaction2, before=serial)
>>> len(pool.all)
2
>>> len(pool.available)
0
>>> historical_conn2.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
1
>>> historical_conn.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
2
If you change the historical cache size, that changes the size of the
persistent cache on our connection.
Also for the next two:
>>> historical_conn._cache.cache_size
2000
>>> db.setHistoricalCacheSize(1500)
>>> historical_conn._cache.cache_size
1500
>>> (conns2[1] is conns1[-2]), (conns2[2] is conns1[-3])
(True, True)
Now let's look at pool sizes. We'll set it to two, then open and close three
connections. We should end up with only two available connections.
But not for the last:
>>> db.setHistoricalPoolSize(2)
>>> conns2[3] is conns1[-4]
False
>>> historical_conn = db.open(
... transaction_manager=transaction1, before=serial)
>>> historical_conn2 = db.open(
... transaction_manager=transaction2, before=serial)
>>> transaction3 = transaction.TransactionManager()
>>> historical_conn3 = db.open(
... transaction_manager=transaction3, at=historical_serial)
>>> len(pool.all)
3
>>> len(pool.available)
0
Because the pool size was set to 3.
>>> historical_conn3.close()
>>> len(pool.all)
3
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
1
Connections are also discarded if they haven't been used in a while.
To see this, let's close two of the connections:
>>> historical_conn2.close()
>>> len(pool.all)
3
>>> len(pool.available)
2
>>> len(pool.available.values()[0])
1
>>> len(pool.available.values()[1])
1
>>> conns2[0].close(); conns2[1].close()
>>> historical_conn.close()
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
2
We'l also set the historical timeout to be very low:
Notice it dumped the one that was closed at the earliest time.
>>> db2.setHistoricalTimeout(.01)
>>> time.sleep(.1)
>>> conns2[2].close(); conns2[3].close()
Finally, we'll look at the timeout. We'll need to monkeypatch ``time`` for
this. (The funky __import__ of DB is because some ZODB __init__ shenanigans
make the DB class mask the DB module.)
Now, when we open 4 connections:
>>> db.getHistoricalTimeout()
400
>>> import time
>>> delta = 200
>>> def stub_time():
... return time.time() + delta
...
>>> DB_module = __import__('ZODB.DB', globals(), locals(), ['chicken'])
>>> original_time = DB_module.time
>>> DB_module.time = stub_time
>>> conns1 = [db2.open(before=serial) for i in range(4)]
>>> historical_conn = db.open(before=serial)
We'll see that only the last 2 connections from conn2 are in the
result:
>>> len(pool.all)
2
>>> len(pool.available)
1
>>> [c in conns1 for c in conns2]
[False, False, True, True]
A close or an open will do garbage collection on the timed out connections.
>>> delta += 200
>>> historical_conn.close()
If you change the historical cache size, that changes the size of the
persistent cache on our connection.
>>> len(pool.all)
1
>>> len(pool.available)
1
>>> len(pool.available.values()[0])
1
>>> historical_conn._cache.cache_size
2000
>>> db.setHistoricalCacheSize(1500)
>>> historical_conn._cache.cache_size
1500
Invalidations
=============
...
...
src/ZODB/tests/dbopen.txt
View file @
1a896e9c
...
...
@@ -239,12 +239,12 @@ Closing connections adds them to the stack:
Closing another one will purge the one with MARKER 0 from the stack
(since it was the first added to the stack):
>>> [c.MARKER for
c in pool.available.values()
]
>>> [c.MARKER for
(t, c) in pool.available
]
[0, 1, 2]
>>> conns[0].close() # MARKER 3
>>> len(pool.available), len(pool.all)
(3, 5)
>>> [c.MARKER for
c in pool.available.values()
]
>>> [c.MARKER for
(t, c) in pool.available
]
[1, 2, 3]
Similarly for the other two:
...
...
@@ -252,7 +252,7 @@ Similarly for the other two:
>>> conns[1].close(); conns[2].close()
>>> len(pool.available), len(pool.all)
(3, 3)
>>> [c.MARKER for
c in pool.available.values()
]
>>> [c.MARKER for
(t, c) in pool.available
]
[3, 4, 5]
Reducing the pool size may also purge the oldest closed connections:
...
...
@@ -260,7 +260,7 @@ Reducing the pool size may also purge the oldest closed connections:
>>> db.setPoolSize(2) # gets rid of MARKER 3
>>> len(pool.available), len(pool.all)
(2, 2)
>>> [c.MARKER for
c in pool.available.values()
]
>>> [c.MARKER for
(t, c) in pool.available
]
[4, 5]
Since MARKER 5 is still the last one added to the stack, it will be the
...
...
src/ZODB/tests/testhistoricalconnections.py
View file @
1a896e9c
...
...
@@ -25,10 +25,7 @@ def setUp(test):
def
tearDown
(
test
):
test
.
globs
[
'db'
].
close
()
test
.
globs
[
'db2'
].
close
()
test
.
globs
[
'storage'
].
close
()
# the DB class masks the module because of __init__ shenanigans
DB_module
=
__import__
(
'ZODB.DB'
,
globals
(),
locals
(),
[
'chicken'
])
DB_module
.
time
=
test
.
globs
[
'original_time'
]
module
.
tearDown
(
test
)
ZODB
.
tests
.
util
.
tearDown
(
test
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment