Commit 1ed49659 authored by Christian Theune's avatar Christian Theune

- Various interface cleanups. ZODB.Connection.Connection is now completely

   documented in an interface.

 - The IDataManager, IDataManagerOrignal and IResourceManager cludge was
   cleaned up to be only IDataManager now which reflects the current 
   situation.
parent fa352cdd
......@@ -252,6 +252,12 @@ class BaseStorage(UndoLogCompatible):
pass
def tpc_finish(self, transaction, f=None):
# It's important that the storage calls the function we pass
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._lock_acquire()
try:
if transaction is not self._transaction:
......
......@@ -23,6 +23,12 @@ from time import time
from persistent import PickleCache
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from transaction.interfaces import IDataManager
from zope.interface import implements
import transaction
from ZODB.ConflictResolution import ResolvedSerial
......@@ -31,12 +37,9 @@ from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference, \
ConnectionStateError
from ZODB.TmpStore import TmpStore
from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
from ZODB.interfaces import IConnection
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
from ZODB.utils import u64, oid_repr, z64, positive_id, \
DEPRECATED_ARGUMENT, deprecated36
global_reset_counter = 0
......@@ -54,127 +57,19 @@ def resetCaches():
global_reset_counter += 1
class Connection(ExportImport, object):
"""Connection to ZODB for loading and storing objects.
The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
:Groups:
- `User Methods`: root, get, add, close, db, sync, isReadOnly,
cacheGC, cacheFullSweep, cacheMinimize, getVersion,
modifiedInVersion
- `Experimental Methods`: setLocalTransaction, getTransaction,
onCloseCallbacks
- `Transaction Data Manager Methods`: tpc_begin, tpc_vote,
tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub,
abort_sub
- `Database Invalidation Methods`: invalidate, _setDB
- `IPersistentDataManager Methods`: setstate, register,
setklassstate
- `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo,
getTransferCounts
"""Connection to ZODB for loading and storing objects."""
"""
implements(IConnection)
implements(IConnection, IDataManager, IPersistentDataManager)
_tmp = None
_code_timestamp = 0
# ZODB.IConnection
def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
instance that it is connected to.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `cache_size`: the target size of the in-memory object
cache, measured in objects.
- `cache_deactivate_after`: deprecated, ignored
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
"""Create a new Connection."""
self._log = logging.getLogger("ZODB.Connection")
self._storage = None
self._debug_info = ()
......@@ -265,79 +160,8 @@ class Connection(ExportImport, object):
connection = new_con
return connection
def getTransaction(self):
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def __repr__(self):
if self._version:
ver = ' (in version %s)' % `self._version`
else:
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
def get(self, oid):
"""Return the persistent object with oid 'oid'.
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
:return: persistent object corresponding to `oid`
:Parameters:
- `oid`: an object id
:Exceptions:
- `KeyError`: if oid does not exist. It is possible that an
object does not exist as of the current transaction, but
existed in the past. It may even exist again in the
future, if the transaction that removed it is undone.
- `ConnectionStateError`: if the connection is closed.
"""
"""Return the persistent object with oid 'oid'."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
......@@ -359,33 +183,8 @@ class Connection(ExportImport, object):
self._cache[oid] = obj
return obj
# deprecate this method?
__getitem__ = get
def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid.
A persistent object is normally added to the database and
assigned an oid when it becomes reachable to an object already in
the database. In some cases, it is useful to create a new
object and use its oid (_p_oid) in a single transaction.
This method assigns a new oid regardless of whether the object
is reachable.
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
:Parameters:
- `obj`: a Persistent object
:Exceptions:
- `TypeError`: if obj is not a persistent object.
- `InvalidObjectReference`: if obj is already associated
with another connection.
- `ConnectionStateError`: if the connection is closed.
"""
"""Add a new object 'obj' to the database and assign it an oid."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
......@@ -409,75 +208,11 @@ class Connection(ExportImport, object):
raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self):
# If two connections use the same storage, give them a
# consistent order using id(). This is unique for the
# lifetime of a connection, which is good enough.
return "%s:%s" % (self._sortKey(), id(self))
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
:Parameters:
- `odb`: database that owns the Connection
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self._sortKey = odb._storage.sortKey
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
# Multi-database support
self.connections = {self._db.database_name: self}
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
"""Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self))
def abort(self, transaction):
"""Abort modifications to registered objects.
This tells the cache to invalidate changed objects. _p_jar
and _p_oid are deleted from new objects.
"""
"""Abort a transaction and forget all changes."""
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
......@@ -490,70 +225,22 @@ class Connection(ExportImport, object):
self._tpc_cleanup()
# Should there be a way to call incrgc directly?
# Perhaps "full sweep" should do that?
# TODO: we should test what happens when these methods are called
# mid-transaction.
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
:Parameters:
- `dt`: ignored. It is provided only for backwards compatibility.
"""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheGC(self):
"""Reduce cache size to target size.
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
"""Reduce cache size to target size."""
self._cache.incrgc()
__onCloseCallbacks = None
def onCloseCallback(self, f):
"""Register a callable, f, to be called by close().
The callable, f, will be called at most once, the next time
the Connection is closed.
:Parameters:
- `f`: object that will be called on `close`
"""
"""Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f)
def close(self):
"""Close the Connection.
A closed Connection should not be used by client code. It
can't load or store objects. Objects in the cache are not
freed, because Connections are re-used and the cache are
expected to be useful to the next client.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is scanned for
old objects.
"""
"""Close the Connection."""
if not self._needs_to_join:
# We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to "
......@@ -590,7 +277,10 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
# transaction.interfaces.IDataManager
def commit(self, transaction):
"""Commit changes to an object"""
if self._import:
# TODO: This code seems important for Zope, but needs docs
# to explain why.
......@@ -668,7 +358,8 @@ class Connection(ExportImport, object):
self._handle_serial(s, oid)
def commit_sub(self, t):
"""Commit all work done in all subtransactions for this transaction."""
"""Commit all changes made in subtransactions and begin 2-phase commit
"""
if self._tmp is None:
return
src = self._storage
......@@ -689,7 +380,7 @@ class Connection(ExportImport, object):
self._handle_serial(s, oid, change=False)
def abort_sub(self, t):
"""Abort work done in all subtransactions for this transaction."""
"""Discard all subtransaction data."""
if self._tmp is None:
return
src = self._storage
......@@ -700,7 +391,7 @@ class Connection(ExportImport, object):
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
"""Dissown any objects newly saved in an uncommitted transaction."""
"""Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
self._creating = []
......@@ -712,42 +403,6 @@ class Connection(ExportImport, object):
del o._p_jar
del o._p_oid
def db(self):
return self._db
def getVersion(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def isReadOnly(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects is accessed by
the current transaction, the revision written before C{tid}
will be used.
The DB calls this method, even when the Connection is closed.
:Parameters:
- `tid`: the storage-level id of the transaction that committed
- `oids`: oids is a set of oids, represented as a dict with oids
as keys.
"""
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
# The next two methods are callbacks for transaction synchronization.
def beforeCompletion(self, txn):
......@@ -768,213 +423,35 @@ class Connection(ExportImport, object):
# Now is a good time to collect some garbage
self._cache.incrgc()
def modifiedInVersion(self, oid):
try:
return self._db.modifiedInVersion(oid)
except KeyError:
return self._version
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
def root(self):
"""Return the database root object.
The root is a persistent.mapping.PersistentMapping.
"""
"""Return the database root object."""
return self.get(z64)
def setstate(self, obj):
oid = obj._p_oid
def db(self):
"""Returns a handle to the database this connection belongs to."""
return self._db
def isReadOnly(self):
"""Returns True if the storage for this connection is read only."""
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
self._log.error(msg)
raise ConnectionStateError(msg)
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try:
self._setstate(obj)
except ConflictError:
raise
except:
self._log.error("Couldn't load state for %s", oid_repr(oid),
exc_info=sys.exc_info())
raise
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# IDataManager
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
"""Return copy of obj that was written by tid.
The returned object does not have the typical metadata
(_p_jar, _p_oid, _p_serial) set. I'm not sure how references
to other peristent objects are handled.
:return: a persistent object
:Parameters:
- `obj`: a persistent object from this Connection.
- `tid`: id of a transaction that wrote an earlier revision.
:Exceptions:
- `KeyError`: if tid does not exist or if tid deleted a revision
of obj.
"""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def tpc_begin(self, transaction, sub=False):
self._modified = []
def tpc_begin(self, transaction, sub=False):
"""Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
......@@ -987,6 +464,7 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
try:
vote = self._storage.tpc_vote
except AttributeError:
......@@ -1037,12 +515,7 @@ class Connection(ExportImport, object):
obj._p_serial = serial
def tpc_finish(self, transaction):
# It's important that the storage calls the function we pass
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
"""Indicate confirmation that the transaction is done."""
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
......@@ -1059,6 +532,7 @@ class Connection(ExportImport, object):
self._tpc_cleanup()
def tpc_abort(self, transaction):
"""Abort a transaction."""
if self._import:
self._import = None
self._storage.tpc_abort(transaction)
......@@ -1070,16 +544,16 @@ class Connection(ExportImport, object):
del obj._p_jar
self._tpc_cleanup()
# Common cleanup actions after tpc_finish/tpc_abort.
def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
self._needs_to_join = True
self._registered_objects = []
def sync(self):
"""Manually update the view on the database."""
self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0)
if sync:
......@@ -1087,22 +561,304 @@ class Connection(ExportImport, object):
self._flush_invalidations()
def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info
def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args
def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored.
If clear is True, reset the counters.
"""
"""Returns the number of objects loaded and stored."""
res = self._load_count, self._store_count
if clear:
self._load_count = 0
self._store_count = 0
return res
##############################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'."""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setstate(self, obj):
"""Turns the ghost 'obj' into a real object by loading it's from the
database."""
oid = obj._p_oid
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
self._log.error(msg)
raise ConnectionStateError(msg)
try:
self._setstate(obj)
except ConflictError:
raise
except:
self._log.error("Couldn't load state for %s", oid_repr(oid),
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
# PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
Parameters:
odb: database that owns the Connection
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means
used the default transaction manager.
synch: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
# Multi-database support
self.connections = {self._db.database_name: self}
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
# Python protocol
def __repr__(self):
if self._version:
ver = ' (in version %s)' % `self._version`
else:
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
# DEPRECATION candidates
__getitem__ = get
def modifiedInVersion(self, oid):
"""Returns the version the object with the given oid was modified in.
If it wasn't modified in a version, the current version of this
connection is returned.
"""
try:
return self._db.modifiedInVersion(oid)
except KeyError:
import pdb; pdb.set_trace()
return self.getVersion()
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid
......@@ -1111,3 +867,52 @@ class Connection(ExportImport, object):
new._p_changed = 1
self._register(new)
self._cache[oid] = new
# DEPRECATED methods
def getTransaction(self):
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache."""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
......@@ -581,7 +581,7 @@ class DB(object):
def get_info(c):
# `result`, `time` and `version` are lexically inherited.
o = c._opened
d = c._debug_info
d = c.getDebugInfo()
if d:
if len(d) == 1:
d = d[0]
......
......@@ -61,6 +61,9 @@ class TmpStore:
serial = h[:8]
return self._file.read(size), serial
def sortKey(self):
return self._storage.sortKey()
# TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid):
......
......@@ -19,11 +19,118 @@ $Id$
from zope.interface import Interface, Attribute
class IConnection(Interface):
"""ZODB connection.
"""Connection to ZODB for loading and storing objects.
TODO: This interface is incomplete.
The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
Groups of methods:
User Methods:
root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep,
cacheMinimize, getVersion, modifiedInVersion
Experimental Methods:
onCloseCallbacks
Database Invalidation Methods:
invalidate
Other Methods: exchange, getDebugInfo, setDebugInfo,
getTransferCounts
"""
def __init__(version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
instance that it is connected to.
Parameters:
version: the "version" that all changes will be made in, defaults
to no version.
cache_size: the target size of the in-memory object cache, measured
in objects.
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means used the default
transaction manager.
synch: boolean indicating whether Connection should register for
afterCompletion() calls.
"""
def add(ob):
"""Add a new object 'obj' to the database and assign it an oid.
......@@ -39,8 +146,108 @@ class IConnection(Interface):
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
Parameters:
obj: a Persistent object
Raises TypeError if obj is not a persistent object.
Raises InvalidObjectReference if obj is already associated with another
connection.
Raises ConnectionStateError if the connection is closed.
"""
def get(oid):
"""Return the persistent object with oid 'oid'.
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
Parameters:
oid: an object id
Raises KeyError if oid does not exist.
It is possible that an
object does not exist as of the current transaction, but
existed in the past. It may even exist again in the
future, if the transaction that removed it is undone.
Raises ConnectionStateError if the connection is closed.
"""
def cacheMinimize():
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
"""
def cacheGC():
"""Reduce cache size to target size.
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
def onCloseCallback(f):
"""Register a callable, f, to be called by close().
f will be called with no arguments before the Connection is closed.
Parameters:
f: method that will be called on `close`
"""
def close():
"""Close the Connection.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is garbage collected.
A closed Connection should not be used by client code. It can't load
or store objects. Objects in the cache are not freed, because
Connections are re-used and the cache is expected to be useful to the
next client.
"""
def db():
"""Returns a handle to the database this connection belongs to."""
def isReadOnly():
"""Returns True if the storage for this connection is read only."""
def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is a set of oids, represented as a dict with oids as keys.
"""
def root():
"""Return the database root object.
The root is a persistent.mapping.PersistentMapping.
"""
def getVersion():
"""Returns the version this connection is attached to."""
# Multi-database support.
connections = Attribute("""\
......@@ -114,3 +321,32 @@ class IDatabase(Interface):
entry.
""")
def sync(self):
"""Manually update the view on the database.
This includes aborting the current transaction, getting a fresh and
consistent view of the data (synchronizing with the storage if possible)
and call cacheGC() for this connection.
This method was especially useful in ZODB 3.2 to better support
read-only connections that were affected by a couple of problems.
"""
# Debug information
def getDebugInfo():
"""Returns a tuple with different items for debugging the connection.
Debug information can be added to a connection by using setDebugInfo.
"""
def setDebugInfo(*items):
"""Add the given items to the debug information of this connection."""
def getTransferCounts(clear=False):
"""Returns the number of objects loaded and stored.
If clear is True, reset the counters.
"""
......@@ -257,18 +257,35 @@ class IPersistentDataManager(Interface):
def setstate(object):
"""Load the state for the given object.
The object should be in the ghost state.
The object's state will be set and the object will end up
in the saved state.
The object should be in the ghost state. The object's state will be
set and the object will end up in the saved state.
The object must provide the IPersistent interface.
"""
def oldstate(obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'.
The returned object does not have the typical metadata (_p_jar, _p_oid,
_p_serial) set. I'm not sure how references to other peristent objects
are handled.
Parameters
obj: a persistent object from this Connection.
tid: id of a transaction that wrote an earlier revision.
Raises KeyError if tid does not exist or if tid deleted a revision of
obj.
"""
def register(object):
"""Register an IPersistent with the current transaction.
This method must be called when the object transitions to
the changed state.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
"""
def mtime(object):
......
......@@ -18,104 +18,7 @@ $Id$
import zope.interface
class IResourceManager(zope.interface.Interface):
"""Objects that manage resources transactionally.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
"""
# Two-phase commit protocol. These methods are called by the
# ITransaction object associated with the transaction being
# committed.
def tpc_begin(transaction):
"""Begin two-phase commit, to save data changes.
An implementation should do as much work as possible without
making changes permanent. Changes should be made permanent
when tpc_finish is called (or aborted if tpc_abort is called).
The work can be divided between tpc_begin() and tpc_vote(), and
the intent is that tpc_vote() be as fast as possible (to minimize
the period of uncertainty).
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_vote(transaction):
"""Verify that a resource manager can commit the transaction.
This is the last chance for a resource manager to vote 'no'. A
resource manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
"""
def tpc_abort(transaction):
"""Abort a transaction.
transaction is the ITransaction instance associated with the
transaction being committed.
All changes made by the current transaction are aborted. Note
that this includes all changes stored in any savepoints that may
be associated with the current transaction.
tpc_abort() can be called at any time, either in or out of the
two-phase commit.
This should never fail.
"""
# The savepoint/rollback API.
def savepoint(transaction):
"""Save partial transaction changes.
There are two purposes:
1) To allow discarding partial changes without discarding all
dhanges.
2) To checkpoint changes to disk that would otherwise live in
memory for the duration of the transaction.
Returns an object implementing ISavePoint2 that can be used
to discard changes made since the savepoint was captured.
An implementation that doesn't support savepoints should implement
this method by returning a savepoint object that raises an
exception when its rollback method is called. The savepoint method
shouldn't raise an error. This way, transactions that create
savepoints can proceed as long as an attempt is never made to roll
back a savepoint.
"""
def discard(transaction):
"""Discard changes within the transaction since the last savepoint.
That means changes made since the last savepoint if one exists, or
since the start of the transaction.
"""
class IDataManagerOriginal(zope.interface.Interface):
class IDataManager(zope.interface.Interface):
"""Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage
......@@ -155,7 +58,7 @@ class IDataManagerOriginal(zope.interface.Interface):
has been called; this is only used when the transaction is
being committed.
This call also implied the beginning of 2-phase commit.
This call also implies the beginning of 2-phase commit.
"""
# Two-phase commit protocol. These methods are called by the
......@@ -180,10 +83,12 @@ class IDataManagerOriginal(zope.interface.Interface):
"""
def tpc_abort(transaction):
"""Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the
......@@ -202,6 +107,11 @@ class IDataManagerOriginal(zope.interface.Interface):
database is not expected to maintain consistency; it's a
serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
"""
def tpc_vote(transaction):
......@@ -214,125 +124,46 @@ class IDataManagerOriginal(zope.interface.Interface):
transaction being committed.
"""
def commit(object, transaction):
"""CCCommit changes to an object
def commit(transaction):
"""Commit modifications to registered objects.
Save the object as part of the data to be made persistent if
the transaction commits.
"""
def abort(object, transaction):
"""Abort changes to an object
Only changes made since the last transaction or
sub-transaction boundary are discarded.
This method may be called either:
o Outside of two-phase commit, or
o In the first phase of two-phase commit
"""
def sortKey():
"""
Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
class IDataManager(zope.interface.Interface):
"""Data management interface for storing objects transactionally.
ZODB database connections currently provides the older
IDataManagerOriginal interface, but the intent is to move to this newer
IDataManager interface.
Our hope is that this interface will evolve and become the standard
interface. There are some issues to be resolved first, like:
- Probably want separate abort methods for use in and out of
two-phase commit.
- The savepoint api may need some more thought.
"""
def prepare(transaction):
"""Perform the first phase of a 2-phase commit
The data manager prepares for commit any changes to be made
persistent. A normal return from this method indicated that
the data manager is ready to commit the changes.
The data manager must raise an exception if it is not prepared
to commit the transaction after executing prepare().
The transaction must match that used for preceeding
savepoints, if any.
This includes conflict detection and handling. If no conflicts or
errors occur it saves the objects in the storage.
"""
# This is equivalent to zodb3's tpc_begin, commit, and
# tpc_vote combined.
def abort(transaction):
"""Abort changes made by transaction
This may be called before two-phase commit or in the second
phase of two-phase commit.
The transaction must match that used for preceeding
savepoints, if any.
"""
# This is equivalent to *both* zodb3's abort and tpc_abort
# calls. This should probably be split into 2 methods.
def commit(transaction):
"""Finish two-phase commit
The prepare method must be called, with the same transaction,
before calling commit.
"""
# This is equivalent to zodb3's tpc_finish
def savepoint(transaction):
"""Do tentative commit of changes to this point.
Should return an object implementing IRollback that can be used
to rollback to the savepoint.
Note that (unlike zodb3) this doesn't use a 2-phase commit
protocol. If this call fails, or if a rollback call on the
result fails, the (containing) transaction should be
aborted. Aborting the containing transaction is *not* the
responsibility of the data manager, however.
"""Abort a transaction and forget all changes.
An implementation that doesn't support savepoints should
implement this method by returning a rollback implementation
that always raises an error when it's rollback method is
called. The savepoing method shouldn't raise an error. This
way, transactions that create savepoints can proceed as long
as an attempt is never made to roll back a savepoint.
Abort must be called outside of a two-phase commit.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
"""
def sortKey():
"""
Return a key to use for ordering registered DataManagers
"""Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
# XXX: Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction before completing a commit"""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit"""
class ITransaction(zope.interface.Interface):
"""Object representing a running transaction.
......@@ -414,34 +245,6 @@ class ITransaction(zope.interface.Interface):
# Unsure: is this allowed to cause an exception here, during
# the two-phase commit, or can it toss data silently?
class ISavePoint(zope.interface.Interface):
"""ISavePoint objects represent partial transaction changes.
Sequences of savepoint objects are associated with transactions,
and with IResourceManagers.
"""
def rollback():
"""Discard changes made after this savepoint.
This includes discarding (call the discard method on) all
subsequent savepoints.
"""
def discard():
"""Discard changes saved by this savepoint.
That means changes made since the immediately preceding
savepoint if one exists, or since the start of the transaction,
until this savepoint.
Once a savepoint has been discarded, it's an error to attempt
to rollback or discard it again.
"""
next_savepoint = zope.interface.Attribute(
"""The next savepoint (later in time), or None if self is the
most recent savepoint.""")
class IRollback(zope.interface.Interface):
......@@ -457,3 +260,4 @@ class IRollback(zope.interface.Interface):
- The transaction has ended.
"""
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment