Commit ab1bbbb4 authored by Tim Peters's avatar Tim Peters

Merge pycon-multidb branch (-r 29573:29605).

This introduces a "multi-database" concept (a simplification
of Jim's Wiki proposal), and adds many interface definitions.

Work done during the PyCon 2005 ZODB sprint, by Christian
Theune, Jim Fulton and Tim Peters.
parent 16d1c10b
...@@ -252,6 +252,12 @@ class BaseStorage(UndoLogCompatible): ...@@ -252,6 +252,12 @@ class BaseStorage(UndoLogCompatible):
pass pass
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
# It's important that the storage calls the function we pass
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._lock_acquire() self._lock_acquire()
try: try:
if transaction is not self._transaction: if transaction is not self._transaction:
......
...@@ -23,6 +23,12 @@ from time import time ...@@ -23,6 +23,12 @@ from time import time
from persistent import PickleCache from persistent import PickleCache
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from transaction.interfaces import IDataManager
from zope.interface import implements
import transaction import transaction
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
...@@ -31,12 +37,9 @@ from ZODB.POSException \ ...@@ -31,12 +37,9 @@ from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference, \ import ConflictError, ReadConflictError, InvalidObjectReference, \
ConnectionStateError ConnectionStateError
from ZODB.TmpStore import TmpStore from ZODB.TmpStore import TmpStore
from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
from ZODB.interfaces import IConnection from ZODB.utils import u64, oid_repr, z64, positive_id, \
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36 DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
global_reset_counter = 0 global_reset_counter = 0
...@@ -54,127 +57,19 @@ def resetCaches(): ...@@ -54,127 +57,19 @@ def resetCaches():
global_reset_counter += 1 global_reset_counter += 1
class Connection(ExportImport, object): class Connection(ExportImport, object):
"""Connection to ZODB for loading and storing objects. """Connection to ZODB for loading and storing objects."""
The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
:Groups:
- `User Methods`: root, get, add, close, db, sync, isReadOnly,
cacheGC, cacheFullSweep, cacheMinimize, getVersion,
modifiedInVersion
- `Experimental Methods`: setLocalTransaction, getTransaction,
onCloseCallbacks
- `Transaction Data Manager Methods`: tpc_begin, tpc_vote,
tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub,
abort_sub
- `Database Invalidation Methods`: invalidate, _setDB
- `IPersistentDataManager Methods`: setstate, register,
setklassstate
- `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo,
getTransferCounts
""" implements(IConnection, IDataManager, IPersistentDataManager)
implements(IConnection)
_tmp = None _tmp = None
_code_timestamp = 0 _code_timestamp = 0
# ZODB.IConnection
def __init__(self, version='', cache_size=400, def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None, cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True): synch=True):
"""Create a new Connection. """Create a new Connection."""
A Connection instance should by instantiated by the DB
instance that it is connected to.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `cache_size`: the target size of the in-memory object
cache, measured in objects.
- `cache_deactivate_after`: deprecated, ignored
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
self._log = logging.getLogger("ZODB.Connection") self._log = logging.getLogger("ZODB.Connection")
self._storage = None self._storage = None
self._debug_info = () self._debug_info = ()
...@@ -220,7 +115,7 @@ class Connection(ExportImport, object): ...@@ -220,7 +115,7 @@ class Connection(ExportImport, object):
# from a single transaction should be applied atomically, so # from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated. # the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated. # It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which # Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage # will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In # might generate oids where hash or compare invokes Python code. In
...@@ -253,79 +148,20 @@ class Connection(ExportImport, object): ...@@ -253,79 +148,20 @@ class Connection(ExportImport, object):
# to pass to _importDuringCommit(). # to pass to _importDuringCommit().
self._import = None self._import = None
def getTransaction(self): self.connections = None
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def __repr__(self): def get_connection(self, database_name):
if self._version: """Return a Connection for the named database."""
ver = ' (in version %s)' % `self._version` connection = self.connections.get(database_name)
else: if connection is None:
ver = '' new_con = self._db.databases[database_name].open()
return '<Connection at %08x%s>' % (positive_id(self), ver) self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def get(self, oid): def get(self, oid):
"""Return the persistent object with oid 'oid'. """Return the persistent object with oid 'oid'."""
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
:return: persistent object corresponding to `oid`
:Parameters:
- `oid`: an object id
:Exceptions:
- `KeyError`: if oid does not exist. It is possible that an
object does not exist as of the current transaction, but
existed in the past. It may even exist again in the
future, if the transaction that removed it is undone.
- `ConnectionStateError`: if the connection is closed.
"""
if self._storage is None: if self._storage is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
...@@ -347,33 +183,8 @@ class Connection(ExportImport, object): ...@@ -347,33 +183,8 @@ class Connection(ExportImport, object):
self._cache[oid] = obj self._cache[oid] = obj
return obj return obj
# deprecate this method?
__getitem__ = get
def add(self, obj): def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid. """Add a new object 'obj' to the database and assign it an oid."""
A persistent object is normally added to the database and
assigned an oid when it becomes reachable to an object already in
the database. In some cases, it is useful to create a new
object and use its oid (_p_oid) in a single transaction.
This method assigns a new oid regardless of whether the object
is reachable.
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
:Parameters:
- `obj`: a Persistent object
:Exceptions:
- `TypeError`: if obj is not a persistent object.
- `InvalidObjectReference`: if obj is already associated
with another connection.
- `ConnectionStateError`: if the connection is closed.
"""
if self._storage is None: if self._storage is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
...@@ -397,72 +208,11 @@ class Connection(ExportImport, object): ...@@ -397,72 +208,11 @@ class Connection(ExportImport, object):
raise InvalidObjectReference(obj, obj._p_jar) raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self): def sortKey(self):
# If two connections use the same storage, give them a """Return a consistent sort key for this connection."""
# consistent order using id(). This is unique for the return "%s:%s" % (self._storage.sortKey(), id(self))
# lifetime of a connection, which is good enough.
return "%s:%s" % (self._sortKey(), id(self))
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
:Parameters:
- `odb`: database that owns the Connection
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self._sortKey = odb._storage.sortKey
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
def abort(self, transaction): def abort(self, transaction):
"""Abort modifications to registered objects. """Abort a transaction and forget all changes."""
This tells the cache to invalidate changed objects. _p_jar
and _p_oid are deleted from new objects.
"""
for obj in self._registered_objects: for obj in self._registered_objects:
oid = obj._p_oid oid = obj._p_oid
assert oid is not None assert oid is not None
...@@ -475,70 +225,22 @@ class Connection(ExportImport, object): ...@@ -475,70 +225,22 @@ class Connection(ExportImport, object):
self._tpc_cleanup() self._tpc_cleanup()
# Should there be a way to call incrgc directly? # TODO: we should test what happens when cacheGC is called mid-transaction.
# Perhaps "full sweep" should do that?
# TODO: we should test what happens when these methods are called
# mid-transaction.
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
:Parameters:
- `dt`: ignored. It is provided only for backwards compatibility.
"""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
def cacheGC(self): def cacheGC(self):
"""Reduce cache size to target size. """Reduce cache size to target size."""
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
self._cache.incrgc() self._cache.incrgc()
__onCloseCallbacks = None __onCloseCallbacks = None
def onCloseCallback(self, f): def onCloseCallback(self, f):
"""Register a callable, f, to be called by close(). """Register a callable, f, to be called by close()."""
The callable, f, will be called at most once, the next time
the Connection is closed.
:Parameters:
- `f`: object that will be called on `close`
"""
if self.__onCloseCallbacks is None: if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = [] self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f) self.__onCloseCallbacks.append(f)
def close(self): def close(self):
"""Close the Connection. """Close the Connection."""
A closed Connection should not be used by client code. It
can't load or store objects. Objects in the cache are not
freed, because Connections are re-used and the cache are
expected to be useful to the next client.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is scanned for
old objects.
"""
if not self._needs_to_join: if not self._needs_to_join:
# We're currently joined to a transaction. # We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to " raise ConnectionStateError("Cannot close a connection joined to "
...@@ -575,7 +277,10 @@ class Connection(ExportImport, object): ...@@ -575,7 +277,10 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by # assert that here, because self may have been reused (by
# another thread) by the time we get back here. # another thread) by the time we get back here.
# transaction.interfaces.IDataManager
def commit(self, transaction): def commit(self, transaction):
"""Commit changes to an object"""
if self._import: if self._import:
# TODO: This code seems important for Zope, but needs docs # TODO: This code seems important for Zope, but needs docs
# to explain why. # to explain why.
...@@ -653,7 +358,8 @@ class Connection(ExportImport, object): ...@@ -653,7 +358,8 @@ class Connection(ExportImport, object):
self._handle_serial(s, oid) self._handle_serial(s, oid)
def commit_sub(self, t): def commit_sub(self, t):
"""Commit all work done in all subtransactions for this transaction.""" """Commit all changes made in subtransactions and begin 2-phase commit
"""
if self._tmp is None: if self._tmp is None:
return return
src = self._storage src = self._storage
...@@ -674,7 +380,7 @@ class Connection(ExportImport, object): ...@@ -674,7 +380,7 @@ class Connection(ExportImport, object):
self._handle_serial(s, oid, change=False) self._handle_serial(s, oid, change=False)
def abort_sub(self, t): def abort_sub(self, t):
"""Abort work done in all subtransactions for this transaction.""" """Discard all subtransaction data."""
if self._tmp is None: if self._tmp is None:
return return
src = self._storage src = self._storage
...@@ -685,7 +391,7 @@ class Connection(ExportImport, object): ...@@ -685,7 +391,7 @@ class Connection(ExportImport, object):
self._invalidate_creating(src._creating) self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None): def _invalidate_creating(self, creating=None):
"""Dissown any objects newly saved in an uncommitted transaction.""" """Disown any objects newly saved in an uncommitted transaction."""
if creating is None: if creating is None:
creating = self._creating creating = self._creating
self._creating = [] self._creating = []
...@@ -697,42 +403,6 @@ class Connection(ExportImport, object): ...@@ -697,42 +403,6 @@ class Connection(ExportImport, object):
del o._p_jar del o._p_jar
del o._p_oid del o._p_oid
def db(self):
return self._db
def getVersion(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def isReadOnly(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects is accessed by
the current transaction, the revision written before C{tid}
will be used.
The DB calls this method, even when the Connection is closed.
:Parameters:
- `tid`: the storage-level id of the transaction that committed
- `oids`: oids is a set of oids, represented as a dict with oids
as keys.
"""
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
# The next two methods are callbacks for transaction synchronization. # The next two methods are callbacks for transaction synchronization.
def beforeCompletion(self, txn): def beforeCompletion(self, txn):
...@@ -753,213 +423,35 @@ class Connection(ExportImport, object): ...@@ -753,213 +423,35 @@ class Connection(ExportImport, object):
# Now is a good time to collect some garbage # Now is a good time to collect some garbage
self._cache.incrgc() self._cache.incrgc()
def modifiedInVersion(self, oid):
try:
return self._db.modifiedInVersion(oid)
except KeyError:
return self._version
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
def root(self): def root(self):
"""Return the database root object. """Return the database root object."""
The root is a persistent.mapping.PersistentMapping.
"""
return self.get(z64) return self.get(z64)
def setstate(self, obj): def db(self):
oid = obj._p_oid """Returns a handle to the database this connection belongs to."""
return self._db
def isReadOnly(self):
"""Returns True if the storage for this connection is read only."""
if self._storage is None: if self._storage is None:
msg = ("Shouldn't load state for %s " raise ConnectionStateError("The database connection is closed")
"when the connection is closed" % oid_repr(oid)) return self._storage.isReadOnly()
self._log.error(msg)
raise ConnectionStateError(msg)
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try: try:
self._setstate(obj) if self._txn_time is None:
except ConflictError: self._txn_time = tid
raise self._invalidated.update(oids)
except: finally:
self._log.error("Couldn't load state for %s", oid_repr(oid), self._inv_lock.release()
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases # IDataManager
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify def tpc_begin(self, transaction, sub=False):
# the code if we could drop support for it. """Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
"""Return copy of obj that was written by tid.
The returned object does not have the typical metadata
(_p_jar, _p_oid, _p_serial) set. I'm not sure how references
to other peristent objects are handled.
:return: a persistent object
:Parameters:
- `obj`: a persistent object from this Connection.
- `tid`: id of a transaction that wrote an earlier revision.
:Exceptions:
- `KeyError`: if tid does not exist or if tid deleted a revision
of obj.
"""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def tpc_begin(self, transaction, sub=False):
self._modified = []
# _creating is a list of oids of new objects, which is used to # _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts. # remove them from the cache if a transaction aborts.
...@@ -972,6 +464,7 @@ class Connection(ExportImport, object): ...@@ -972,6 +464,7 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(transaction) self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
try: try:
vote = self._storage.tpc_vote vote = self._storage.tpc_vote
except AttributeError: except AttributeError:
...@@ -1022,12 +515,7 @@ class Connection(ExportImport, object): ...@@ -1022,12 +515,7 @@ class Connection(ExportImport, object):
obj._p_serial = serial obj._p_serial = serial
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
# It's important that the storage calls the function we pass """Indicate confirmation that the transaction is done."""
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
if self._tmp is not None: if self._tmp is not None:
# Commiting a subtransaction! # Commiting a subtransaction!
# There is no need to invalidate anything. # There is no need to invalidate anything.
...@@ -1044,6 +532,7 @@ class Connection(ExportImport, object): ...@@ -1044,6 +532,7 @@ class Connection(ExportImport, object):
self._tpc_cleanup() self._tpc_cleanup()
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
"""Abort a transaction."""
if self._import: if self._import:
self._import = None self._import = None
self._storage.tpc_abort(transaction) self._storage.tpc_abort(transaction)
...@@ -1055,16 +544,16 @@ class Connection(ExportImport, object): ...@@ -1055,16 +544,16 @@ class Connection(ExportImport, object):
del obj._p_jar del obj._p_jar
self._tpc_cleanup() self._tpc_cleanup()
# Common cleanup actions after tpc_finish/tpc_abort.
def _tpc_cleanup(self): def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear() self._conflicts.clear()
if not self._synch: if not self._synch:
self._flush_invalidations() self._flush_invalidations()
self._needs_to_join = True self._needs_to_join = True
self._registered_objects = [] self._registered_objects = []
def sync(self): def sync(self):
"""Manually update the view on the database."""
self._txn_mgr.get().abort() self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0) sync = getattr(self._storage, 'sync', 0)
if sync: if sync:
...@@ -1072,22 +561,304 @@ class Connection(ExportImport, object): ...@@ -1072,22 +561,304 @@ class Connection(ExportImport, object):
self._flush_invalidations() self._flush_invalidations()
def getDebugInfo(self): def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info return self._debug_info
def setDebugInfo(self, *args): def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args self._debug_info = self._debug_info + args
def getTransferCounts(self, clear=False): def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored. """Returns the number of objects loaded and stored."""
If clear is True, reset the counters.
"""
res = self._load_count, self._store_count res = self._load_count, self._store_count
if clear: if clear:
self._load_count = 0 self._load_count = 0
self._store_count = 0 self._store_count = 0
return res return res
##############################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'."""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setstate(self, obj):
"""Turns the ghost 'obj' into a real object by loading it's from the
database."""
oid = obj._p_oid
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
self._log.error(msg)
raise ConnectionStateError(msg)
try:
self._setstate(obj)
except ConflictError:
raise
except:
self._log.error("Couldn't load state for %s", oid_repr(oid),
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
# PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
Parameters:
odb: database that owns the Connection
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means
used the default transaction manager.
synch: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
# Multi-database support
self.connections = {self._db.database_name: self}
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
# Python protocol
def __repr__(self):
if self._version:
ver = ' (in version %s)' % `self._version`
else:
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
# DEPRECATION candidates
__getitem__ = get
def modifiedInVersion(self, oid):
"""Returns the version the object with the given oid was modified in.
If it wasn't modified in a version, the current version of this
connection is returned.
"""
try:
return self._db.modifiedInVersion(oid)
except KeyError:
import pdb; pdb.set_trace()
return self.getVersion()
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def exchange(self, old, new): def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite # called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid oid = old._p_oid
...@@ -1096,3 +867,52 @@ class Connection(ExportImport, object): ...@@ -1096,3 +867,52 @@ class Connection(ExportImport, object):
new._p_changed = 1 new._p_changed = 1
self._register(new) self._register(new)
self._cache[oid] = new self._cache[oid] = new
# DEPRECATED methods
def getTransaction(self):
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache."""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
...@@ -27,6 +27,9 @@ from ZODB.serialize import referencesf ...@@ -27,6 +27,9 @@ from ZODB.serialize import referencesf
from ZODB.utils import WeakSet from ZODB.utils import WeakSet
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36 from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
from ZODB.interfaces import IDatabase
import transaction import transaction
logger = logging.getLogger('ZODB.DB') logger = logging.getLogger('ZODB.DB')
...@@ -178,6 +181,7 @@ class DB(object): ...@@ -178,6 +181,7 @@ class DB(object):
setCacheDeactivateAfter, setCacheDeactivateAfter,
getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter
""" """
implements(IDatabase)
klass = Connection # Class to use for connections klass = Connection # Class to use for connections
_activity_monitor = None _activity_monitor = None
...@@ -188,6 +192,8 @@ class DB(object): ...@@ -188,6 +192,8 @@ class DB(object):
cache_deactivate_after=DEPRECATED_ARGUMENT, cache_deactivate_after=DEPRECATED_ARGUMENT,
version_pool_size=3, version_pool_size=3,
version_cache_size=100, version_cache_size=100,
database_name='unnamed',
databases=None,
version_cache_deactivate_after=DEPRECATED_ARGUMENT, version_cache_deactivate_after=DEPRECATED_ARGUMENT,
): ):
"""Create an object database. """Create an object database.
...@@ -248,6 +254,16 @@ class DB(object): ...@@ -248,6 +254,16 @@ class DB(object):
storage.tpc_vote(t) storage.tpc_vote(t)
storage.tpc_finish(t) storage.tpc_finish(t)
# Multi-database setup.
if databases is None:
databases = {}
self.databases = databases
self.database_name = database_name
if database_name in databases:
raise ValueError("database_name %r already in databases" %
database_name)
databases[database_name] = self
# Pass through methods: # Pass through methods:
for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog', for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
'versionEmpty', 'versions']: 'versionEmpty', 'versions']:
...@@ -565,7 +581,7 @@ class DB(object): ...@@ -565,7 +581,7 @@ class DB(object):
def get_info(c): def get_info(c):
# `result`, `time` and `version` are lexically inherited. # `result`, `time` and `version` are lexically inherited.
o = c._opened o = c._opened
d = c._debug_info d = c.getDebugInfo()
if d: if d:
if len(d) == 1: if len(d) == 1:
d = d[0] d = d[0]
......
...@@ -547,6 +547,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -547,6 +547,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_release() self._lock_release()
def load(self, oid, version): def load(self, oid, version):
"""Return pickle data and serial number."""
self._lock_acquire() self._lock_acquire()
try: try:
pos = self._lookup_pos(oid) pos = self._lookup_pos(oid)
......
...@@ -68,16 +68,16 @@ ...@@ -68,16 +68,16 @@
# #
# - 8-byte data length # - 8-byte data length
# #
# ? 8-byte position of non-version data # ? 8-byte position of non-version data record
# (if version length > 0) # (if version length > 0)
# #
# ? 8-byte position of previous record in this version # ? 8-byte position of previous record in this version
# (if version length > 0) # (if version length > 0)
# #
# ? version string # ? version string
# (if version length > 0) # (if version length > 0)
# #
# ? data # ? data
# (data length > 0) # (data length > 0)
# #
# ? 8-byte position of data record containing data # ? 8-byte position of data record containing data
......
...@@ -61,6 +61,9 @@ class TmpStore: ...@@ -61,6 +61,9 @@ class TmpStore:
serial = h[:8] serial = h[:8]
return self._file.read(size), serial return self._file.read(size), serial
def sortKey(self):
return self._storage.sortKey()
# TODO: clarify difference between self._storage & self._db._storage # TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
......
...@@ -16,14 +16,122 @@ ...@@ -16,14 +16,122 @@
$Id$ $Id$
""" """
import zope.interface from zope.interface import Interface, Attribute
class IConnection(zope.interface.Interface): class IConnection(Interface):
"""ZODB connection. """Connection to ZODB for loading and storing objects.
TODO: This interface is incomplete. The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
Groups of methods:
User Methods:
root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep,
cacheMinimize, getVersion, modifiedInVersion
Experimental Methods:
onCloseCallbacks
Database Invalidation Methods:
invalidate
Other Methods: exchange, getDebugInfo, setDebugInfo,
getTransferCounts
""" """
def __init__(version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
instance that it is connected to.
Parameters:
version: the "version" that all changes will be made in, defaults
to no version.
cache_size: the target size of the in-memory object cache, measured
in objects.
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means used the default
transaction manager.
synch: boolean indicating whether Connection should register for
afterCompletion() calls.
"""
def add(ob): def add(ob):
"""Add a new object 'obj' to the database and assign it an oid. """Add a new object 'obj' to the database and assign it an oid.
...@@ -38,4 +146,330 @@ class IConnection(zope.interface.Interface): ...@@ -38,4 +146,330 @@ class IConnection(zope.interface.Interface):
The object is added when the transaction commits. The object The object is added when the transaction commits. The object
must implement the IPersistent interface and must not must implement the IPersistent interface and must not
already be associated with a Connection. already be associated with a Connection.
Parameters:
obj: a Persistent object
Raises TypeError if obj is not a persistent object.
Raises InvalidObjectReference if obj is already associated with another
connection.
Raises ConnectionStateError if the connection is closed.
"""
def get(oid):
"""Return the persistent object with oid 'oid'.
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
Parameters:
oid: an object id
Raises KeyError if oid does not exist.
It is possible that an object does not exist as of the current
transaction, but existed in the past. It may even exist again in
the future, if the transaction that removed it is undone.
Raises ConnectionStateError if the connection is closed.
"""
def cacheMinimize():
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
"""
def cacheGC():
"""Reduce cache size to target size.
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
def onCloseCallback(f):
"""Register a callable, f, to be called by close().
f will be called with no arguments before the Connection is closed.
Parameters:
f: method that will be called on `close`
"""
def close():
"""Close the Connection.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is garbage collected.
A closed Connection should not be used by client code. It can't load
or store objects. Objects in the cache are not freed, because
Connections are re-used and the cache is expected to be useful to the
next client.
"""
def db():
"""Returns a handle to the database this connection belongs to."""
def isReadOnly():
"""Returns True if the storage for this connection is read only."""
def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is a set of oids, represented as a dict with oids as keys.
""" """
def root():
"""Return the database root object.
The root is a persistent.mapping.PersistentMapping.
"""
def getVersion():
"""Returns the version this connection is attached to."""
# Multi-database support.
connections = Attribute("""\
A mapping from database name to a Connection to that database.
In multi-database use, the Connections of all members of a database
collection share the same .connections object.
In single-database use, of course this mapping contains a single
entry.
""")
# TODO: should this accept all the arguments one may pass to DB.open()?
def get_connection(database_name):
"""Return a Connection for the named database.
This is intended to be called from an open Connection associated with
a multi-database. In that case, database_name must be the name of a
database within the database collection (probably the name of a
different database than is associated with the calling Connection
instance, but it's fine to use the name of the calling Connection
object's database). A Connection for the named database is
returned. If no connection to that database is already open, a new
Connection is opened. So long as the multi-database remains open,
passing the same name to get_connection() multiple times returns the
same Connection object each time.
"""
def sync():
"""Manually update the view on the database.
This includes aborting the current transaction, getting a fresh and
consistent view of the data (synchronizing with the storage if possible)
and call cacheGC() for this connection.
This method was especially useful in ZODB 3.2 to better support
read-only connections that were affected by a couple of problems.
"""
# Debug information
def getDebugInfo():
"""Returns a tuple with different items for debugging the connection.
Debug information can be added to a connection by using setDebugInfo.
"""
def setDebugInfo(*items):
"""Add the given items to the debug information of this connection."""
def getTransferCounts(clear=False):
"""Returns the number of objects loaded and stored.
If clear is True, reset the counters.
"""
class IDatabase(Interface):
"""ZODB DB.
TODO: This interface is incomplete.
"""
def __init__(storage,
pool_size=7,
cache_size=400,
version_pool_size=3,
version_cache_size=100,
database_name='unnamed',
databases=None,
):
"""Create an object database.
storage: the storage used by the database, e.g. FileStorage
pool_size: expected maximum number of open connections
cache_size: target size of Connection object cache, in number of
objects
version_pool_size: expected maximum number of connections (per
version)
version_cache_size: target size of Connection object cache for
version connections, in number of objects
database_name: when using a multi-database, the name of this DB
within the database group. It's a (detected) error if databases
is specified too and database_name is already a key in it.
This becomes the value of the DB's database_name attribute.
databases: when using a multi-database, a mapping to use as the
binding of this DB's .databases attribute. It's intended
that the second and following DB's added to a multi-database
pass the .databases attribute set on the first DB added to the
collection.
"""
databases = Attribute("""\
A mapping from database name to DB (database) object.
In multi-database use, all DB members of a database collection share
the same .databases object.
In single-database use, of course this mapping contains a single
entry.
""")
class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
def load(oid, version):
"""XXX"""
def close():
"""XXX"""
def cleanup():
"""XXX"""
def lastSerial():
"""XXX"""
def lastTransaction():
"""XXX"""
def lastTid(oid):
"""Return last serialno committed for object oid."""
def loadSerial(oid, serial):
"""XXX"""
def loadBefore(oid, tid):
"""XXX"""
def iterator(start=None, stop=None):
"""XXX"""
def sortKey():
"""XXX"""
def getName():
"""XXX"""
def getSize():
"""XXX"""
def history(oid, version, length=1, filter=None):
"""XXX"""
def new_oid(last=None):
"""XXX"""
def set_max_oid(possible_new_max_oid):
"""XXX"""
def registerDB(db, limit):
"""XXX"""
def isReadOnly():
"""XXX"""
def supportsUndo():
"""XXX"""
def supportsVersions():
"""XXX"""
def tpc_abort(transaction):
"""XXX"""
def tpc_begin(transaction):
"""XXX"""
def tpc_vote(transaction):
"""XXX"""
def tpc_finish(transaction, f=None):
"""XXX"""
def getSerial(oid):
"""XXX"""
def loadSerial(oid, serial):
"""XXX"""
def loadBefore(oid, tid):
"""XXX"""
def getExtensionMethods():
"""XXX"""
def copyTransactionsFrom():
"""XXX"""
def store(oid, oldserial, data, version, transaction):
"""
may return the new serial or not
"""
class IUndoableStorage(IStorage):
def undo(transaction_id, txn):
"""XXX"""
def undoInfo():
"""XXX"""
def undoLog(first, last, filter=None):
"""XXX"""
def pack(t, referencesf):
"""XXX"""
class IVersioningStorage(IStorage):
def abortVersion(src, transaction):
"""XXX"""
def commitVersion(src, dest, transaction):
"""XXX"""
def modifiedInVersion(oid):
"""XXX"""
def versionEmpty(version):
"""XXX"""
def versions(max=None):
"""XXX"""
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Multi-database tests
====================
Multi-database support adds the ability to tie multiple databases into a
collection. The original proposal is in the fishbowl:
http://www.zope.org/Wikis/ZODB/MultiDatabases/
It was implemented during the PyCon 2005 sprints, but in a simpler form,
by Jim Fulton, Christian Theune,and Tim Peters. Overview:
No private attributes were added, and one new method was introduced.
DB:
- a new .database_name attribute holds the name of this database
- a new .databases attribute maps from database name to DB object; all DBs
in a multi-database collection share the same .databases object
- the DB constructor has new optional arguments with the same names
(database_name= and databases=).
Connection:
- a new .connections attribute maps from database name to a Connection for
the database with that name; the .connections mapping object is also
shared among databases in a collection
- a new .get_connection(database_name) method returns a Connection for a
database in the collection; if a connection is already open, it's returned
(this is the value .connections[database_name]), else a new connection is
opened (and stored as .connections[database_name])
Creating a multi-database starts with creating a named DB:
>>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB
>>> dbmap = {}
>>> db = DB(MinimalMemoryStorage(), database_name='root', databases=dbmap)
The database name is accessible afterwards and in a newly created collection:
>>> db.database_name
'root'
>>> db.databases # doctest: +ELLIPSIS
{'root': <ZODB.DB.DB object at ...>}
>>> db.databases is dbmap
True
Adding another database to the collection works like this:
>>> db2 = DB(MinimalMemoryStorage(),
... database_name='notroot',
... databases=dbmap)
The new db2 now shares the 'databases' dictionary with db and has two entries:
>>> db2.databases is db.databases is dbmap
True
>>> len(db2.databases)
2
>>> names = dbmap.keys(); names.sort(); print names
['notroot', 'root']
It's an error to try to insert a database with a name already in use:
>>> db3 = DB(MinimalMemoryStorage(),
... database_name='root',
... databases=dbmap)
Traceback (most recent call last):
...
ValueError: database_name 'root' already in databases
Because that failed, db.databases wasn't changed:
>>> len(db.databases) # still 2
2
You can (still) get a connection to a database this way:
>>> cn = db.open()
>>> cn # doctest: +ELLIPSIS
<Connection at ...>
This is the only connection in this collection right now:
>>> cn.connections # doctest: +ELLIPSIS
{'root': <Connection at ...>}
Getting a connection to a different database from an existing connection in the
same database collection (this enables 'connection binding' within a given
thread/transaction/context ...):
>>> cn2 = cn.get_connection('notroot')
>>> cn2 # doctest: +ELLIPSIS
<Connection at ...>
Now there are two connections in that collection:
>>> cn2.connections is cn.connections
True
>>> len(cn2.connections)
2
>>> names = cn.connections.keys(); names.sort(); print names
['notroot', 'root']
So long as this database group remains open, the same Connection objects
are returned:
>>> cn.get_connection('root') is cn
True
>>> cn.get_connection('notroot') is cn2
True
>>> cn2.get_connection('root') is cn
True
>>> cn2.get_connection('notroot') is cn2
True
Of course trying to get a connection for a database not in the group raises
an exception:
>>> cn.get_connection('no way')
Traceback (most recent call last):
...
KeyError: 'no way'
Clean up:
>>> for a_db in dbmap.values():
... a_db.close()
...@@ -647,6 +647,8 @@ class StubDatabase: ...@@ -647,6 +647,8 @@ class StubDatabase:
self._storage = StubStorage() self._storage = StubStorage()
classFactory = None classFactory = None
database_name = 'stubdatabase'
databases = {'stubdatabase': database_name}
def invalidate(self, transaction, dict_with_oid_keys, connection): def invalidate(self, transaction, dict_with_oid_keys, connection):
pass pass
......
...@@ -15,4 +15,6 @@ ...@@ -15,4 +15,6 @@
from zope.testing.doctestunit import DocFileSuite from zope.testing.doctestunit import DocFileSuite
def test_suite(): def test_suite():
return DocFileSuite("dbopen.txt") return DocFileSuite("dbopen.txt",
"multidb.txt",
)
...@@ -257,18 +257,35 @@ class IPersistentDataManager(Interface): ...@@ -257,18 +257,35 @@ class IPersistentDataManager(Interface):
def setstate(object): def setstate(object):
"""Load the state for the given object. """Load the state for the given object.
The object should be in the ghost state. The object should be in the ghost state. The object's state will be
The object's state will be set and the object will end up set and the object will end up in the saved state.
in the saved state.
The object must provide the IPersistent interface. The object must provide the IPersistent interface.
""" """
def oldstate(obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'.
The returned object does not have the typical metadata (_p_jar, _p_oid,
_p_serial) set. I'm not sure how references to other peristent objects
are handled.
Parameters
obj: a persistent object from this Connection.
tid: id of a transaction that wrote an earlier revision.
Raises KeyError if tid does not exist or if tid deleted a revision of
obj.
"""
def register(object): def register(object):
"""Register an IPersistent with the current transaction. """Register an IPersistent with the current transaction.
This method must be called when the object transitions to This method must be called when the object transitions to
the changed state. the changed state.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
""" """
def mtime(object): def mtime(object):
......
...@@ -18,104 +18,7 @@ $Id$ ...@@ -18,104 +18,7 @@ $Id$
import zope.interface import zope.interface
class IResourceManager(zope.interface.Interface): class IDataManager(zope.interface.Interface):
"""Objects that manage resources transactionally.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
"""
# Two-phase commit protocol. These methods are called by the
# ITransaction object associated with the transaction being
# committed.
def tpc_begin(transaction):
"""Begin two-phase commit, to save data changes.
An implementation should do as much work as possible without
making changes permanent. Changes should be made permanent
when tpc_finish is called (or aborted if tpc_abort is called).
The work can be divided between tpc_begin() and tpc_vote(), and
the intent is that tpc_vote() be as fast as possible (to minimize
the period of uncertainty).
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_vote(transaction):
"""Verify that a resource manager can commit the transaction.
This is the last chance for a resource manager to vote 'no'. A
resource manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
"""
def tpc_abort(transaction):
"""Abort a transaction.
transaction is the ITransaction instance associated with the
transaction being committed.
All changes made by the current transaction are aborted. Note
that this includes all changes stored in any savepoints that may
be associated with the current transaction.
tpc_abort() can be called at any time, either in or out of the
two-phase commit.
This should never fail.
"""
# The savepoint/rollback API.
def savepoint(transaction):
"""Save partial transaction changes.
There are two purposes:
1) To allow discarding partial changes without discarding all
dhanges.
2) To checkpoint changes to disk that would otherwise live in
memory for the duration of the transaction.
Returns an object implementing ISavePoint2 that can be used
to discard changes made since the savepoint was captured.
An implementation that doesn't support savepoints should implement
this method by returning a savepoint object that raises an
exception when its rollback method is called. The savepoint method
shouldn't raise an error. This way, transactions that create
savepoints can proceed as long as an attempt is never made to roll
back a savepoint.
"""
def discard(transaction):
"""Discard changes within the transaction since the last savepoint.
That means changes made since the last savepoint if one exists, or
since the start of the transaction.
"""
class IDataManagerOriginal(zope.interface.Interface):
"""Objects that manage transactional storage. """Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage These objects may manage data for other objects, or they may manage
...@@ -155,7 +58,7 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -155,7 +58,7 @@ class IDataManagerOriginal(zope.interface.Interface):
has been called; this is only used when the transaction is has been called; this is only used when the transaction is
being committed. being committed.
This call also implied the beginning of 2-phase commit. This call also implies the beginning of 2-phase commit.
""" """
# Two-phase commit protocol. These methods are called by the # Two-phase commit protocol. These methods are called by the
...@@ -180,10 +83,12 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -180,10 +83,12 @@ class IDataManagerOriginal(zope.interface.Interface):
""" """
def tpc_abort(transaction): def tpc_abort(transaction):
"""Abort a transaction. """Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call. This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the transaction is the ITransaction instance associated with the
...@@ -202,6 +107,11 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -202,6 +107,11 @@ class IDataManagerOriginal(zope.interface.Interface):
database is not expected to maintain consistency; it's a database is not expected to maintain consistency; it's a
serious error. serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
""" """
def tpc_vote(transaction): def tpc_vote(transaction):
...@@ -214,125 +124,46 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -214,125 +124,46 @@ class IDataManagerOriginal(zope.interface.Interface):
transaction being committed. transaction being committed.
""" """
def commit(object, transaction): def commit(transaction):
"""CCCommit changes to an object """Commit modifications to registered objects.
Save the object as part of the data to be made persistent if Save the object as part of the data to be made persistent if
the transaction commits. the transaction commits.
"""
def abort(object, transaction):
"""Abort changes to an object
Only changes made since the last transaction or
sub-transaction boundary are discarded.
This method may be called either:
o Outside of two-phase commit, or
o In the first phase of two-phase commit
"""
def sortKey():
"""
Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
class IDataManager(zope.interface.Interface):
"""Data management interface for storing objects transactionally.
ZODB database connections currently provides the older
IDataManagerOriginal interface, but the intent is to move to this newer
IDataManager interface.
Our hope is that this interface will evolve and become the standard
interface. There are some issues to be resolved first, like:
- Probably want separate abort methods for use in and out of
two-phase commit.
- The savepoint api may need some more thought.
"""
def prepare(transaction):
"""Perform the first phase of a 2-phase commit
The data manager prepares for commit any changes to be made
persistent. A normal return from this method indicated that
the data manager is ready to commit the changes.
The data manager must raise an exception if it is not prepared
to commit the transaction after executing prepare().
The transaction must match that used for preceeding This includes conflict detection and handling. If no conflicts or
savepoints, if any. errors occur it saves the objects in the storage.
""" """
# This is equivalent to zodb3's tpc_begin, commit, and
# tpc_vote combined.
def abort(transaction): def abort(transaction):
"""Abort changes made by transaction """Abort a transaction and forget all changes.
This may be called before two-phase commit or in the second
phase of two-phase commit.
The transaction must match that used for preceeding
savepoints, if any.
"""
# This is equivalent to *both* zodb3's abort and tpc_abort
# calls. This should probably be split into 2 methods.
def commit(transaction):
"""Finish two-phase commit
The prepare method must be called, with the same transaction,
before calling commit.
"""
# This is equivalent to zodb3's tpc_finish
def savepoint(transaction):
"""Do tentative commit of changes to this point.
Should return an object implementing IRollback that can be used
to rollback to the savepoint.
Note that (unlike zodb3) this doesn't use a 2-phase commit
protocol. If this call fails, or if a rollback call on the
result fails, the (containing) transaction should be
aborted. Aborting the containing transaction is *not* the
responsibility of the data manager, however.
An implementation that doesn't support savepoints should Abort must be called outside of a two-phase commit.
implement this method by returning a rollback implementation
that always raises an error when it's rollback method is
called. The savepoing method shouldn't raise an error. This
way, transactions that create savepoints can proceed as long
as an attempt is never made to roll back a savepoint.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
""" """
def sortKey(): def sortKey():
""" """Return a key to use for ordering registered DataManagers
Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering manager must define a sortKey() method that provides a global ordering
for resource managers. for resource managers.
""" """
# XXX: Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction before completing a commit"""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit"""
class ITransaction(zope.interface.Interface): class ITransaction(zope.interface.Interface):
"""Object representing a running transaction. """Object representing a running transaction.
...@@ -414,34 +245,6 @@ class ITransaction(zope.interface.Interface): ...@@ -414,34 +245,6 @@ class ITransaction(zope.interface.Interface):
# Unsure: is this allowed to cause an exception here, during # Unsure: is this allowed to cause an exception here, during
# the two-phase commit, or can it toss data silently? # the two-phase commit, or can it toss data silently?
class ISavePoint(zope.interface.Interface):
"""ISavePoint objects represent partial transaction changes.
Sequences of savepoint objects are associated with transactions,
and with IResourceManagers.
"""
def rollback():
"""Discard changes made after this savepoint.
This includes discarding (call the discard method on) all
subsequent savepoints.
"""
def discard():
"""Discard changes saved by this savepoint.
That means changes made since the immediately preceding
savepoint if one exists, or since the start of the transaction,
until this savepoint.
Once a savepoint has been discarded, it's an error to attempt
to rollback or discard it again.
"""
next_savepoint = zope.interface.Attribute(
"""The next savepoint (later in time), or None if self is the
most recent savepoint.""")
class IRollback(zope.interface.Interface): class IRollback(zope.interface.Interface):
...@@ -457,3 +260,4 @@ class IRollback(zope.interface.Interface): ...@@ -457,3 +260,4 @@ class IRollback(zope.interface.Interface):
- The transaction has ended. - The transaction has ended.
""" """
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment