Commit 0bd33210 authored by Tim Peters's avatar Tim Peters

Port rev 26199 from the 3.3 branch.

Zope3-dev Collector #139: Memory leak involving buckets and connections

Connection objects were typically immortal because the threaded
transaction manager kept them in ever-growing lists.  Reworked the
transaction manager internals to use a simple implementation of weak sets
instead.  This plugs all leaks in the test program attached to the
collector report (which was leaking about 100KB/sec on my box).
parent a3252542
...@@ -2,6 +2,21 @@ What's new in ZODB3 3.3 ...@@ -2,6 +2,21 @@ What's new in ZODB3 3.3
======================= =======================
Release date: DD-MMM-2004 Release date: DD-MMM-2004
Transaction Managers
--------------------
Zope3-dev Collector #139: Memory leak involving buckets and connections
The transaction manager internals effectively made every Connection
object immortal, except for those explicitly closed. Since typical
practice is not to close connections explicitly (and closing a DB
happens not to close the connections to it -- although that may
change), this caused massive memory leaks when many connections were
opened. The transaction manager internals were reworked to use weak
references instead, so that connection memory (and other registered
synch objects) now get cleaned up when nothing other than the
transaction manager knows about them.
Storages Storages
-------- --------
......
...@@ -18,24 +18,70 @@ are associated with the right transaction. ...@@ -18,24 +18,70 @@ are associated with the right transaction.
""" """
import thread import thread
import weakref
from transaction._transaction import Transaction from transaction._transaction import Transaction
# We have to remember sets of synch objects, especially Connections.
# But we don't want mere registration with a transaction manager to
# keep a synch object alive forever; in particular, it's common
# practice not to explicitly close Connection objects, and keeping
# a Connection alive keeps a potentially huge number of other objects
# alive (e.g., the cache, and everything reachable from it too).
#
# Therefore we use "weak sets" internally. The implementation here
# implements just enough of Python's sets.Set interface for our needs.
class WeakSet(object):
"""A set of objects that doesn't keep its elements alive.
The objects in the set must be weakly referencable.
The objects need not be hashable, and need not support comparison.
Two objects are considered to be the same iff their id()s are equal.
When the only references to an object are weak references (including
those from WeakSets), the object can be garbage-collected, and
will vanish from any WeakSets it may be a member of at that time.
"""
def __init__(self):
# Map id(obj) to obj. By using ids as keys, we avoid requiring
# that the elements be hashable or comparable.
self.data = weakref.WeakValueDictionary()
# Same as a Set, add obj to the collection.
def add(self, obj):
self.data[id(obj)] = obj
# Same as a Set, remove obj from the collection, and raise
# KeyError if obj not in the collection.
def remove(self, obj):
del self.data[id(obj)]
# Return a list of all the objects in the collection.
# Because a weak dict is used internally, iteration
# is dicey (the underlying dict may change size during
# iteration, due to gc or activity from other threads).
# as_list() attempts to be safe.
def as_list(self):
return self.data.values()
class TransactionManager(object): class TransactionManager(object):
def __init__(self): def __init__(self):
self._txn = None self._txn = None
self._synchs = [] self._synchs = WeakSet()
def begin(self): def begin(self):
if self._txn is not None: if self._txn is not None:
self._txn.abort() self._txn.abort()
self._txn = Transaction(self._synchs, self) self._txn = Transaction(self._synchs.as_list(), self)
return self._txn return self._txn
def get(self): def get(self):
if self._txn is None: if self._txn is None:
self._txn = Transaction(self._synchs, self) self._txn = Transaction(self._synchs.as_list(), self)
return self._txn return self._txn
def free(self, txn): def free(self, txn):
...@@ -43,7 +89,7 @@ class TransactionManager(object): ...@@ -43,7 +89,7 @@ class TransactionManager(object):
self._txn = None self._txn = None
def registerSynch(self, synch): def registerSynch(self, synch):
self._synchs.append(synch) self._synchs.add(synch)
def unregisterSynch(self, synch): def unregisterSynch(self, synch):
self._synchs.remove(synch) self._synchs.remove(synch)
...@@ -57,9 +103,9 @@ class ThreadTransactionManager(object): ...@@ -57,9 +103,9 @@ class ThreadTransactionManager(object):
def __init__(self): def __init__(self):
# _threads maps thread ids to transactions # _threads maps thread ids to transactions
self._txns = {} self._txns = {}
# _synchs maps a thread id to a list of registered synchronizers. # _synchs maps a thread id to a WeakSet of registered synchronizers.
# The list is passed to the Transaction constructor, because # The set elements are passed to the Transaction constructor,
# it needs to call the synchronizers when it commits. # because the latter needs to call the synchronizers when it commits.
self._synchs = {} self._synchs = {}
def begin(self): def begin(self):
...@@ -67,14 +113,20 @@ class ThreadTransactionManager(object): ...@@ -67,14 +113,20 @@ class ThreadTransactionManager(object):
txn = self._txns.get(tid) txn = self._txns.get(tid)
if txn is not None: if txn is not None:
txn.abort() txn.abort()
txn = self._txns[tid] = Transaction(self._synchs.get(tid), self) synchs = self._synchs.get(tid)
if synchs is not None:
synchs = synchs.as_list()
txn = self._txns[tid] = Transaction(synchs, self)
return txn return txn
def get(self): def get(self):
tid = thread.get_ident() tid = thread.get_ident()
txn = self._txns.get(tid) txn = self._txns.get(tid)
if txn is None: if txn is None:
txn = self._txns[tid] = Transaction(self._synchs.get(tid), self) synchs = self._synchs.get(tid)
if synchs is not None:
synchs = synchs.as_list()
txn = self._txns[tid] = Transaction(synchs, self)
return txn return txn
def free(self, txn): def free(self, txn):
...@@ -84,10 +136,12 @@ class ThreadTransactionManager(object): ...@@ -84,10 +136,12 @@ class ThreadTransactionManager(object):
def registerSynch(self, synch): def registerSynch(self, synch):
tid = thread.get_ident() tid = thread.get_ident()
L = self._synchs.setdefault(tid, []) ws = self._synchs.get(tid)
L.append(synch) if ws is None:
ws = self._synchs[tid] = WeakSet()
ws.add(synch)
def unregisterSynch(self, synch): def unregisterSynch(self, synch):
tid = thread.get_ident() tid = thread.get_ident()
L = self._synchs.get(tid) ws = self._synchs[tid]
L.remove(synch) ws.remove(synch)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment