Commit a54e7bb3 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge the jeremy-txn-branch to the head.

This branch introduces a new transaction API.  The key features are:
  - top-level functions in transaction -- get(), commit(), abort()
  - explicit transaction manager objects
  - Transaction objects are used for exactly one transaction
  - support for transaction synchronizers

The changes here are still provisional, but we want to get them off an
obscure branch and onto the head for further development.
parent 0f8c1aa0
...@@ -27,6 +27,8 @@ import sys ...@@ -27,6 +27,8 @@ import sys
import threading import threading
import time import time
import transaction
from ZEO import ClientStub from ZEO import ClientStub
from ZEO.CommitLog import CommitLog from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer from ZEO.monitor import StorageStats, StatsServer
...@@ -40,7 +42,6 @@ from ZODB.ConflictResolution import ResolvedSerial ...@@ -40,7 +42,6 @@ from ZODB.ConflictResolution import ResolvedSerial
from ZODB.POSException import StorageError, StorageTransactionError from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction
from ZODB.utils import u64, oid_repr from ZODB.utils import u64, oid_repr
_label = "ZSS" # Default label used for logging. _label = "ZSS" # Default label used for logging.
...@@ -365,7 +366,7 @@ class ZEOStorage: ...@@ -365,7 +366,7 @@ class ZEOStorage:
raise StorageTransactionError("Multiple simultaneous tpc_begin" raise StorageTransactionError("Multiple simultaneous tpc_begin"
" requests from one client.") " requests from one client.")
self.transaction = t = Transaction() self.transaction = t = transaction.Transaction()
t.id = id t.id = id
t.user = user t.user = user
t.description = description t.description = description
......
...@@ -13,10 +13,11 @@ ...@@ -13,10 +13,11 @@
############################################################################## ##############################################################################
"""Tests of the ZEO cache""" """Tests of the ZEO cache"""
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle from ZODB.tests.StorageTestBase import zodb_unpickle
from transaction import Transaction
class TransUndoStorageWithCache: class TransUndoStorageWithCache:
def checkUndoInvalidation(self): def checkUndoInvalidation(self):
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
import threading import threading
import time import time
from ZODB.Transaction import Transaction
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
import transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage import ZEO.ClientStorage
...@@ -97,7 +97,7 @@ class CommitLockTests: ...@@ -97,7 +97,7 @@ class CommitLockTests:
self._storages = [] self._storages = []
def _start_txn(self): def _start_txn(self):
txn = Transaction() txn = transaction.Transaction()
self._storage.tpc_begin(txn) self._storage.tpc_begin(txn)
oid = self._storage.new_oid() oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn) self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
...@@ -112,7 +112,7 @@ class CommitLockTests: ...@@ -112,7 +112,7 @@ class CommitLockTests:
for i in range(self.NUM_CLIENTS): for i in range(self.NUM_CLIENTS):
storage = self._duplicate_client() storage = self._duplicate_client()
txn = Transaction() txn = transaction.Transaction()
tid = self._get_timestamp() tid = self._get_timestamp()
t = WorkerThread(self, storage, txn) t = WorkerThread(self, storage, txn)
......
...@@ -29,13 +29,15 @@ from ZEO.zrpc.marshal import Marshaller ...@@ -29,13 +29,15 @@ from ZEO.zrpc.marshal import Marshaller
from ZEO.tests import forker from ZEO.tests import forker
from ZODB.DB import DB from ZODB.DB import DB
from ZODB.Transaction import get_transaction, Transaction
from ZODB.POSException import ReadOnlyError, ConflictError from ZODB.POSException import ReadOnlyError, ConflictError
from ZODB.tests.StorageTestBase import StorageTestBase from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \ from ZODB.tests.StorageTestBase \
import zodb_pickle, zodb_unpickle, handle_all_serials, handle_serials import zodb_pickle, zodb_unpickle, handle_all_serials, handle_serials
import transaction
from transaction import Transaction
ZERO = '\0'*8 ZERO = '\0'*8
class TestServerStub(ZEO.ServerStub.StorageServer): class TestServerStub(ZEO.ServerStub.StorageServer):
...@@ -465,7 +467,7 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -465,7 +467,7 @@ class ConnectionTests(CommonSetupTearDown):
zLOG.LOG("checkReconnection", zLOG.INFO, zLOG.LOG("checkReconnection", zLOG.INFO,
"Error after server restart; retrying.", "Error after server restart; retrying.",
error=sys.exc_info()) error=sys.exc_info())
get_transaction().abort() transaction.abort()
# Give the other thread a chance to run. # Give the other thread a chance to run.
time.sleep(0.1) time.sleep(0.1)
zLOG.LOG("checkReconnection", zLOG.INFO, "finished") zLOG.LOG("checkReconnection", zLOG.INFO, "finished")
...@@ -552,7 +554,7 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -552,7 +554,7 @@ class ConnectionTests(CommonSetupTearDown):
r1 = c1.root() r1 = c1.root()
r1["a"] = MinPO("a") r1["a"] = MinPO("a")
get_transaction().commit() transaction.commit()
db2 = DB(self.openClientStorage()) db2 = DB(self.openClientStorage())
r2 = db2.open().root() r2 = db2.open().root()
...@@ -560,7 +562,7 @@ class ConnectionTests(CommonSetupTearDown): ...@@ -560,7 +562,7 @@ class ConnectionTests(CommonSetupTearDown):
self.assertEqual(r2["a"].value, "a") self.assertEqual(r2["a"].value, "a")
r2["b"] = MinPO("b") r2["b"] = MinPO("b")
get_transaction().commit() transaction.commit()
# make sure the invalidation is received in the other client # make sure the invalidation is received in the other client
for i in range(10): for i in range(10):
......
...@@ -16,6 +16,8 @@ import threading ...@@ -16,6 +16,8 @@ import threading
import time import time
from random import Random from random import Random
import transaction
from BTrees.check import check, display from BTrees.check import check, display
from BTrees.OOBTree import OOBTree from BTrees.OOBTree import OOBTree
...@@ -70,8 +72,7 @@ class StressTask: ...@@ -70,8 +72,7 @@ class StressTask:
self.step = step self.step = step
self.sleep = sleep self.sleep = sleep
self.added_keys = [] self.added_keys = []
self.cn = self.db.open() self.cn = self.db.open(txn_mgr=transaction.TransactionManager())
self.cn.setLocalTransaction()
self.cn.sync() self.cn.sync()
def doStep(self): def doStep(self):
......
...@@ -15,9 +15,8 @@ ...@@ -15,9 +15,8 @@
import threading import threading
from ZODB.Transaction import Transaction import transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage import ZEO.ClientStorage
ZERO = '\0'*8 ZERO = '\0'*8
...@@ -25,7 +24,7 @@ ZERO = '\0'*8 ...@@ -25,7 +24,7 @@ ZERO = '\0'*8
class BasicThread(threading.Thread): class BasicThread(threading.Thread):
def __init__(self, storage, doNextEvent, threadStartedEvent): def __init__(self, storage, doNextEvent, threadStartedEvent):
self.storage = storage self.storage = storage
self.trans = Transaction() self.trans = transaction.Transaction()
self.doNextEvent = doNextEvent self.doNextEvent = doNextEvent
self.threadStartedEvent = threadStartedEvent self.threadStartedEvent = threadStartedEvent
self.gotValueError = 0 self.gotValueError = 0
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Database connection support """Database connection support
$Id: Connection.py,v 1.140 2004/03/16 16:18:20 jeremy Exp $""" $Id: Connection.py,v 1.141 2004/04/01 03:56:58 jeremy Exp $"""
import logging import logging
import sys import sys
...@@ -26,12 +26,13 @@ from utils import u64 ...@@ -26,12 +26,13 @@ from utils import u64
from persistent import PickleCache from persistent import PickleCache
from persistent.interfaces import IPersistent from persistent.interfaces import IPersistent
import transaction
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport from ZODB.ExportImport import ExportImport
from ZODB.POSException \ from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference import ConflictError, ReadConflictError, InvalidObjectReference
from ZODB.TmpStore import TmpStore from ZODB.TmpStore import TmpStore
from ZODB.Transaction import Transaction, get_transaction
from ZODB.utils import oid_repr, z64 from ZODB.utils import oid_repr, z64
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
...@@ -123,10 +124,9 @@ class Connection(ExportImport, object): ...@@ -123,10 +124,9 @@ class Connection(ExportImport, object):
_tmp = None _tmp = None
_code_timestamp = 0 _code_timestamp = 0
_transaction = None
def __init__(self, version='', cache_size=400, def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True): cache_deactivate_after=None, mvcc=True, txn_mgr=None):
"""Create a new Connection. """Create a new Connection.
A Connection instance should by instantiated by the DB A Connection instance should by instantiated by the DB
...@@ -153,6 +153,13 @@ class Connection(ExportImport, object): ...@@ -153,6 +153,13 @@ class Connection(ExportImport, object):
self._reset_counter = global_reset_counter self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored self._store_count = 0 # Number of objects stored
self._modified = []
# If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance
# variable will hold either a TM class or the transaction
# module itself, which implements the same interface.
self._txn_mgr = txn_mgr or transaction
# _invalidated queues invalidate messages delivered from the DB # _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while # _inv_lock prevents one thread from modifying the set while
...@@ -187,21 +194,18 @@ class Connection(ExportImport, object): ...@@ -187,21 +194,18 @@ class Connection(ExportImport, object):
self._import = None self._import = None
def getTransaction(self): def getTransaction(self):
t = self._transaction # XXX mark this as deprecated?
if t is None: return self._txn_mgr.get()
# Fall back to thread-bound transactions
t = get_transaction()
return t
def setLocalTransaction(self): def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread""" """Use a transaction bound to the connection rather than the thread"""
# XXX mention that this should only be called when you open # XXX mark this method as depcrecated? note that it's
# a connection or at transaction boundaries (but the lattter are # signature changed?
# hard to be sure about).
if self._transaction is None: if self._txn_mgr is transaction:
self._transaction = Transaction() self._txn_mgr = transaction.TransactionManager()
return self._transaction return self._txn_mgr
def _cache_items(self): def _cache_items(self):
# find all items on the lru list # find all items on the lru list
...@@ -307,10 +311,13 @@ class Connection(ExportImport, object): ...@@ -307,10 +311,13 @@ class Connection(ExportImport, object):
assert obj._p_oid is None assert obj._p_oid is None
oid = obj._p_oid = self._storage.new_oid() oid = obj._p_oid = self._storage.new_oid()
obj._p_jar = self obj._p_jar = self
self._added[oid] = obj
if self._added_during_commit is not None: if self._added_during_commit is not None:
self._added_during_commit.append(obj) self._added_during_commit.append(obj)
self.getTransaction().register(obj) self._txn_mgr.get().register(obj)
# Add to _added after calling register(), so that _added
# can be used as a test for whether the object has been
# registered with the transaction.
self._added[oid] = obj
elif obj._p_jar is not self: elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar) raise InvalidObjectReference(obj, obj._p_jar)
...@@ -453,7 +460,7 @@ class Connection(ExportImport, object): ...@@ -453,7 +460,7 @@ class Connection(ExportImport, object):
except: # except what? except: # except what?
f = getattr(f, 'im_self', f) f = getattr(f, 'im_self', f)
self._log.error("Close callback failed for %s", f, self._log.error("Close callback failed for %s", f,
sys.exc_info()) exc_info=sys.exc_info())
self.__onCloseCallbacks = None self.__onCloseCallbacks = None
self._storage = self._tmp = self.new_oid = None self._storage = self._tmp = self.new_oid = None
self._debug_info = () self._debug_info = ()
...@@ -472,7 +479,6 @@ class Connection(ExportImport, object): ...@@ -472,7 +479,6 @@ class Connection(ExportImport, object):
oid = obj._p_oid oid = obj._p_oid
if oid in self._conflicts: if oid in self._conflicts:
self.getTransaction().register(obj)
raise ReadConflictError(object=obj) raise ReadConflictError(object=obj)
if oid is None or obj._p_jar is not self: if oid is None or obj._p_jar is not self:
...@@ -650,7 +656,10 @@ class Connection(ExportImport, object): ...@@ -650,7 +656,10 @@ class Connection(ExportImport, object):
# a way, this will be a very confusing warning. # a way, this will be a very confusing warning.
warnings.warn("Assigning to _p_jar is deprecated", warnings.warn("Assigning to _p_jar is deprecated",
DeprecationWarning) DeprecationWarning)
self.getTransaction().register(obj) elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._txn_mgr.get().register(obj)
def root(self): def root(self):
"""Return the database root object. """Return the database root object.
...@@ -729,7 +738,7 @@ class Connection(ExportImport, object): ...@@ -729,7 +738,7 @@ class Connection(ExportImport, object):
"""Load non-current state for obj or raise ReadConflictError.""" """Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)): if not (self._mvcc and self._setstate_noncurrent(obj)):
self.getTransaction().register(obj) self._txn_mgr.get().register(obj)
self._conflicts[obj._p_oid] = True self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj) raise ReadConflictError(object=obj)
...@@ -771,7 +780,7 @@ class Connection(ExportImport, object): ...@@ -771,7 +780,7 @@ class Connection(ExportImport, object):
finally: finally:
self._inv_lock.release() self._inv_lock.release()
else: else:
self.getTransaction().register(obj) self._txn_mgr.get().register(obj)
raise ReadConflictError(object=obj) raise ReadConflictError(object=obj)
def oldstate(self, obj, tid): def oldstate(self, obj, tid):
...@@ -829,19 +838,16 @@ class Connection(ExportImport, object): ...@@ -829,19 +838,16 @@ class Connection(ExportImport, object):
del obj._p_oid del obj._p_oid
del obj._p_jar del obj._p_jar
def tpc_begin(self, transaction, sub=None): def tpc_begin(self, transaction, sub=False):
self._modified = [] self._modified = []
# _creating is a list of oids of new objects, which is used to # _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts. # remove them from the cache if a transaction aborts.
self._creating = [] self._creating = []
if sub: if sub and self._tmp is None:
# Sub-transaction! # Sub-transaction!
if self._tmp is None:
_tmp = TmpStore(self._version)
self._tmp = self._storage self._tmp = self._storage
self._storage = _tmp self._storage = TmpStore(self._version, self._storage)
_tmp.registerDB(self._db, 0)
self._storage.tpc_begin(transaction) self._storage.tpc_begin(transaction)
...@@ -920,7 +926,7 @@ class Connection(ExportImport, object): ...@@ -920,7 +926,7 @@ class Connection(ExportImport, object):
self._flush_invalidations() self._flush_invalidations()
def sync(self): def sync(self):
self.getTransaction().abort() self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0) sync = getattr(self._storage, 'sync', 0)
if sync: if sync:
sync() sync()
...@@ -949,5 +955,5 @@ class Connection(ExportImport, object): ...@@ -949,5 +955,5 @@ class Connection(ExportImport, object):
new._p_oid = oid new._p_oid = oid
new._p_jar = self new._p_jar = self
new._p_changed = 1 new._p_changed = 1
self.getTransaction().register(new) self._txn_mgr.get().register(new)
self._cache[oid] = new self._cache[oid] = new
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Database objects """Database objects
$Id: DB.py,v 1.71 2004/03/16 16:28:19 jeremy Exp $""" $Id: DB.py,v 1.72 2004/04/01 03:56:58 jeremy Exp $"""
import cPickle, cStringIO, sys import cPickle, cStringIO, sys
from thread import allocate_lock from thread import allocate_lock
...@@ -23,9 +23,10 @@ import warnings ...@@ -23,9 +23,10 @@ import warnings
from ZODB.broken import find_global from ZODB.broken import find_global
from ZODB.Connection import Connection from ZODB.Connection import Connection
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction, get_transaction
from zLOG import LOG, ERROR from zLOG import LOG, ERROR
import transaction
class DB(object): class DB(object):
"""The Object Database """The Object Database
------------------- -------------------
...@@ -132,7 +133,7 @@ class DB(object): ...@@ -132,7 +133,7 @@ class DB(object):
p = cPickle.Pickler(file, 1) p = cPickle.Pickler(file, 1)
p.dump((root.__class__, None)) p.dump((root.__class__, None))
p.dump(root.__getstate__()) p.dump(root.__getstate__())
t = Transaction() t = transaction.Transaction()
t.description = 'initial database creation' t.description = 'initial database creation'
storage.tpc_begin(t) storage.tpc_begin(t)
storage.store('\0\0\0\0\0\0\0\0', None, file.getvalue(), '', t) storage.store('\0\0\0\0\0\0\0\0', None, file.getvalue(), '', t)
...@@ -140,13 +141,12 @@ class DB(object): ...@@ -140,13 +141,12 @@ class DB(object):
storage.tpc_finish(t) storage.tpc_finish(t)
# Pass through methods: # Pass through methods:
for m in ('history', for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
'supportsUndo', 'supportsVersions', 'undoLog', 'versionEmpty', 'versions']:
'versionEmpty', 'versions'):
setattr(self, m, getattr(storage, m)) setattr(self, m, getattr(storage, m))
if hasattr(storage, 'undoInfo'): if hasattr(storage, 'undoInfo'):
self.undoInfo=storage.undoInfo self.undoInfo = storage.undoInfo
def _cacheMean(self, attr): def _cacheMean(self, attr):
...@@ -206,10 +206,10 @@ class DB(object): ...@@ -206,10 +206,10 @@ class DB(object):
self._temps=t self._temps=t
finally: self._r() finally: self._r()
def abortVersion(self, version, transaction=None): def abortVersion(self, version, txn=None):
if transaction is None: if txn is None:
transaction = get_transaction() txn = transaction.get()
transaction.register(AbortVersion(self, version)) txn.register(AbortVersion(self, version))
def cacheDetail(self): def cacheDetail(self):
"""Return information on objects in the various caches """Return information on objects in the various caches
...@@ -316,10 +316,10 @@ class DB(object): ...@@ -316,10 +316,10 @@ class DB(object):
""" """
self._storage.close() self._storage.close()
def commitVersion(self, source, destination='', transaction=None): def commitVersion(self, source, destination='', txn=None):
if transaction is None: if txn is None:
transaction = get_transaction() txn = transaction.get()
transaction.register(CommitVersion(self, source, destination)) txn.register(CommitVersion(self, source, destination))
def getCacheSize(self): def getCacheSize(self):
return self._cache_size return self._cache_size
...@@ -391,7 +391,7 @@ class DB(object): ...@@ -391,7 +391,7 @@ class DB(object):
return len(self._storage) return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None, def open(self, version='', transaction=None, temporary=0, force=None,
waitflag=1, mvcc=True): waitflag=1, mvcc=True, txn_mgr=None):
"""Return a database Connection for use by application code. """Return a database Connection for use by application code.
The optional version argument can be used to specify that a The optional version argument can be used to specify that a
...@@ -424,7 +424,7 @@ class DB(object): ...@@ -424,7 +424,7 @@ class DB(object):
# a one-use connection. # a one-use connection.
c = self.klass(version=version, c = self.klass(version=version,
cache_size=self._version_cache_size, cache_size=self._version_cache_size,
mvcc=mvcc) mvcc=mvcc, txn_mgr=txn_mgr)
c._setDB(self) c._setDB(self)
self._temps.append(c) self._temps.append(c)
if transaction is not None: if transaction is not None:
...@@ -474,13 +474,13 @@ class DB(object): ...@@ -474,13 +474,13 @@ class DB(object):
if self._version_pool_size > len(allocated) or force: if self._version_pool_size > len(allocated) or force:
c = self.klass(version=version, c = self.klass(version=version,
cache_size=self._version_cache_size, cache_size=self._version_cache_size,
mvcc=mvcc) mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c) allocated.append(c)
pool.append(c) pool.append(c)
elif self._pool_size > len(allocated) or force: elif self._pool_size > len(allocated) or force:
c = self.klass(version=version, c = self.klass(version=version,
cache_size=self._cache_size, cache_size=self._cache_size,
mvcc=mvcc) mvcc=mvcc, txn_mgr=txn_mgr)
allocated.append(c) allocated.append(c)
pool.append(c) pool.append(c)
...@@ -611,7 +611,7 @@ class DB(object): ...@@ -611,7 +611,7 @@ class DB(object):
def cacheStatistics(self): return () # :( def cacheStatistics(self): return () # :(
def undo(self, id, transaction=None): def undo(self, id, txn=None):
"""Undo a transaction identified by id. """Undo a transaction identified by id.
A transaction can be undone if all of the objects involved in A transaction can be undone if all of the objects involved in
...@@ -625,12 +625,12 @@ class DB(object): ...@@ -625,12 +625,12 @@ class DB(object):
:Parameters: :Parameters:
- `id`: a storage-specific transaction identifier - `id`: a storage-specific transaction identifier
- `transaction`: transaction context to use for undo(). - `txn`: transaction context to use for undo().
By default, uses the current transaction. By default, uses the current transaction.
""" """
if transaction is None: if txn is None:
transaction = get_transaction() txn = transaction.get()
transaction.register(TransactionalUndo(self, id)) txn.register(TransactionalUndo(self, id))
def versionEmpty(self, version): def versionEmpty(self, version):
return self._storage.versionEmpty(version) return self._storage.versionEmpty(version)
...@@ -663,7 +663,6 @@ class ResourceManager(object): ...@@ -663,7 +663,6 @@ class ResourceManager(object):
def __init__(self, db): def __init__(self, db):
self._db = db self._db = db
# Delegate the actual 2PC methods to the storage # Delegate the actual 2PC methods to the storage
self.tpc_begin = self._db._storage.tpc_begin
self.tpc_vote = self._db._storage.tpc_vote self.tpc_vote = self._db._storage.tpc_vote
self.tpc_finish = self._db._storage.tpc_finish self.tpc_finish = self._db._storage.tpc_finish
self.tpc_abort = self._db._storage.tpc_abort self.tpc_abort = self._db._storage.tpc_abort
...@@ -671,13 +670,19 @@ class ResourceManager(object): ...@@ -671,13 +670,19 @@ class ResourceManager(object):
def sortKey(self): def sortKey(self):
return "%s:%s" % (self._db._storage.sortKey(), id(self)) return "%s:%s" % (self._db._storage.sortKey(), id(self))
def tpc_begin(self, txn, sub=False):
# XXX we should never be called with sub=True.
if sub:
raise ValueError, "doesn't supoprt sub-transactions"
self._db._storage.tpc_begin(txn)
# The object registers itself with the txn manager, so the ob # The object registers itself with the txn manager, so the ob
# argument to the methods below is self. # argument to the methods below is self.
def abort(self, ob, t): def abort(self, obj, txn):
pass pass
def commit(self, ob, t): def commit(self, obj, txn):
pass pass
class CommitVersion(ResourceManager): class CommitVersion(ResourceManager):
......
...@@ -21,7 +21,7 @@ It is meant to illustrate the simplest possible storage. ...@@ -21,7 +21,7 @@ It is meant to illustrate the simplest possible storage.
The Mapping storage uses a single data structure to map object ids to data. The Mapping storage uses a single data structure to map object ids to data.
""" """
__version__='$Revision: 1.12 $'[11:-2] __version__='$Revision: 1.13 $'[11:-2]
from ZODB import utils from ZODB import utils
from ZODB import BaseStorage from ZODB import BaseStorage
...@@ -68,6 +68,16 @@ class MappingStorage(BaseStorage.BaseStorage): ...@@ -68,6 +68,16 @@ class MappingStorage(BaseStorage.BaseStorage):
finally: finally:
self._lock_release() self._lock_release()
def getTid(self, oid):
self._lock_acquire()
try:
# The tid is the first 8 bytes of the buffer.
s = self._index[oid]
return s[:8]
finally:
self._lock_release()
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
......
...@@ -22,8 +22,9 @@ class TmpStore: ...@@ -22,8 +22,9 @@ class TmpStore:
_bver = '' _bver = ''
def __init__(self, base_version): def __init__(self, base_version, storage):
self._transaction = None self._transaction = None
self._storage = storage
if base_version: if base_version:
self._bver = base_version self._bver = base_version
self._file = tempfile.TemporaryFile() self._file = tempfile.TemporaryFile()
...@@ -34,14 +35,13 @@ class TmpStore: ...@@ -34,14 +35,13 @@ class TmpStore:
self._index = {} self._index = {}
# _tindex: map oid to pos for new updates # _tindex: map oid to pos for new updates
self._tindex = {} self._tindex = {}
self._db = None
self._creating = [] self._creating = []
def close(self): def close(self):
self._file.close() self._file.close()
def getName(self): def getName(self):
return self._db.getName() return self._storage.getName()
def getSize(self): def getSize(self):
return self._pos return self._pos
...@@ -66,14 +66,13 @@ class TmpStore: ...@@ -66,14 +66,13 @@ class TmpStore:
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
if self._index.has_key(oid): if self._index.has_key(oid):
return self._bver return self._bver
return self._db._storage.modifiedInVersion(oid) return self._storage.modifiedInVersion(oid)
def new_oid(self): def new_oid(self):
return self._db._storage.new_oid() return self._storage.new_oid()
def registerDB(self, db, limit): def registerDB(self, db, limit):
self._db = db pass
self._storage = db._storage
def store(self, oid, serial, data, version, transaction): def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction: if transaction is not self._transaction:
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""Transaction management """Transaction management
$Id: Transaction.py,v 1.58 2004/02/27 00:31:53 faassen Exp $ $Id: Transaction.py,v 1.59 2004/04/01 03:56:58 jeremy Exp $
""" """
import sys import sys
from thread import get_ident as _get_ident from thread import get_ident as _get_ident
...@@ -64,6 +64,7 @@ class Transaction: ...@@ -64,6 +64,7 @@ class Transaction:
self._id=id self._id=id
self._objects=[] self._objects=[]
self._append=self._objects.append self._append=self._objects.append
raise RuntimeError
def _init(self): def _init(self):
self._objects=[] self._objects=[]
...@@ -532,25 +533,27 @@ class DataManagerAdapter(object): ...@@ -532,25 +533,27 @@ class DataManagerAdapter(object):
############################################################################ ############################################################################
# install get_transaction: # install get_transaction:
# Map thread ident to its Transaction instance. ### Map thread ident to its Transaction instance.
_tid2tran = {} ##_tid2tran = {}
# Get Transaction associated with current thread; if none, create a ### Get Transaction associated with current thread; if none, create a
# new Transaction and return it. ### new Transaction and return it.
def get_transaction(): ##def get_transaction():
tid = _get_ident() ## tid = _get_ident()
result = _tid2tran.get(tid) ## result = _tid2tran.get(tid)
if result is None: ## if result is None:
_tid2tran[tid] = result = Transaction(tid) ## _tid2tran[tid] = result = Transaction(tid)
return result ## return result
# Forget whatever Transaction (if any) is associated with current thread. ### Forget whatever Transaction (if any) is associated with current thread.
def free_transaction(): ##def free_transaction():
tid = _get_ident() ## tid = _get_ident()
try: ## try:
del _tid2tran[tid] ## del _tid2tran[tid]
except KeyError: ## except KeyError:
pass ## pass
from transaction import get as get_transaction
import __builtin__ import __builtin__
__builtin__.get_transaction = get_transaction __builtin__.get_transaction = get_transaction
......
...@@ -19,20 +19,20 @@ http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storag ...@@ -19,20 +19,20 @@ http://www.zope.org/Documentation/Developer/Models/ZODB/ZODB_Architecture_Storag
All storages should be able to pass these tests. All storages should be able to pass these tests.
""" """
from ZODB.Transaction import Transaction
from ZODB import POSException from ZODB import POSException
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase \ from ZODB.tests.StorageTestBase \
import zodb_unpickle, zodb_pickle, handle_serials import zodb_unpickle, zodb_pickle, handle_serials
import transaction
ZERO = '\0'*8 ZERO = '\0'*8
class BasicStorage: class BasicStorage:
def checkBasics(self): def checkBasics(self):
t = Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
# This should simply return # This should simply return
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
...@@ -44,10 +44,10 @@ class BasicStorage: ...@@ -44,10 +44,10 @@ class BasicStorage:
self.assertRaises( self.assertRaises(
POSException.StorageTransactionError, POSException.StorageTransactionError,
self._storage.store, self._storage.store,
0, 0, 0, 0, Transaction()) 0, 0, 0, 0, transaction.Transaction())
try: try:
self._storage.abortVersion('dummy', Transaction()) self._storage.abortVersion('dummy', transaction.Transaction())
except (POSException.StorageTransactionError, except (POSException.StorageTransactionError,
POSException.VersionCommitError): POSException.VersionCommitError):
pass # test passed ;) pass # test passed ;)
...@@ -55,7 +55,7 @@ class BasicStorage: ...@@ -55,7 +55,7 @@ class BasicStorage:
assert 0, "Should have failed, invalid transaction." assert 0, "Should have failed, invalid transaction."
try: try:
self._storage.commitVersion('dummy', 'dummer', Transaction()) self._storage.commitVersion('dummy', 'dummer', transaction.Transaction())
except (POSException.StorageTransactionError, except (POSException.StorageTransactionError,
POSException.VersionCommitError): POSException.VersionCommitError):
pass # test passed ;) pass # test passed ;)
...@@ -65,13 +65,13 @@ class BasicStorage: ...@@ -65,13 +65,13 @@ class BasicStorage:
self.assertRaises( self.assertRaises(
POSException.StorageTransactionError, POSException.StorageTransactionError,
self._storage.store, self._storage.store,
0, 1, 2, 3, Transaction()) 0, 1, 2, 3, transaction.Transaction())
self._storage.tpc_abort(t) self._storage.tpc_abort(t)
def checkSerialIsNoneForInitialRevision(self): def checkSerialIsNoneForInitialRevision(self):
eq = self.assertEqual eq = self.assertEqual
oid = self._storage.new_oid() oid = self._storage.new_oid()
txn = Transaction() txn = transaction.Transaction()
self._storage.tpc_begin(txn) self._storage.tpc_begin(txn)
# Use None for serial. Don't use _dostore() here because that coerces # Use None for serial. Don't use _dostore() here because that coerces
# serial=None to serial=ZERO. # serial=None to serial=ZERO.
...@@ -120,7 +120,7 @@ class BasicStorage: ...@@ -120,7 +120,7 @@ class BasicStorage:
def checkWriteAfterAbort(self): def checkWriteAfterAbort(self):
oid = self._storage.new_oid() oid = self._storage.new_oid()
t = Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t) self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction # Now abort this transaction
...@@ -133,7 +133,7 @@ class BasicStorage: ...@@ -133,7 +133,7 @@ class BasicStorage:
oid1 = self._storage.new_oid() oid1 = self._storage.new_oid()
revid1 = self._dostore(oid=oid1, data=MinPO(-2)) revid1 = self._dostore(oid=oid1, data=MinPO(-2))
oid = self._storage.new_oid() oid = self._storage.new_oid()
t = Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t) self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
# Now abort this transaction # Now abort this transaction
...@@ -176,7 +176,7 @@ class BasicStorage: ...@@ -176,7 +176,7 @@ class BasicStorage:
def checkTwoArgBegin(self): def checkTwoArgBegin(self):
# XXX how standard is three-argument tpc_begin()? # XXX how standard is three-argument tpc_begin()?
t = Transaction() t = transaction.Transaction()
tid = '\0\0\0\0\0psu' tid = '\0\0\0\0\0psu'
self._storage.tpc_begin(t, tid) self._storage.tpc_begin(t, tid)
oid = self._storage.new_oid() oid = self._storage.new_oid()
...@@ -205,7 +205,7 @@ class BasicStorage: ...@@ -205,7 +205,7 @@ class BasicStorage:
def checkNote(self): def checkNote(self):
oid = self._storage.new_oid() oid = self._storage.new_oid()
t = Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
t.note('this is a test') t.note('this is a test')
self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t) self._storage.store(oid, ZERO, zodb_pickle(MinPO(5)), '', t)
......
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
############################################################################## ##############################################################################
"""Tests for application-level conflict resolution.""" """Tests for application-level conflict resolution."""
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError, UndoError from ZODB.POSException import ConflictError, UndoError
from persistent import Persistent from persistent import Persistent
from transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
......
...@@ -17,8 +17,8 @@ Any storage that supports the history() method should be able to pass ...@@ -17,8 +17,8 @@ Any storage that supports the history() method should be able to pass
all these tests. all these tests.
""" """
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from transaction import Transaction
class HistoryStorage: class HistoryStorage:
def checkSimpleHistory(self): def checkSimpleHistory(self):
......
...@@ -20,8 +20,8 @@ all these tests. ...@@ -20,8 +20,8 @@ all these tests.
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
from ZODB.utils import U64, p64 from ZODB.utils import U64, p64
from ZODB.Transaction import Transaction
from transaction import Transaction
class IteratorCompare: class IteratorCompare:
......
...@@ -3,13 +3,13 @@ import sys ...@@ -3,13 +3,13 @@ import sys
import threading import threading
import time import time
import ZODB
from persistent.mapping import PersistentMapping from persistent.mapping import PersistentMapping
import transaction
import ZODB
from ZODB.tests.StorageTestBase \ from ZODB.tests.StorageTestBase \
import zodb_pickle, zodb_unpickle, handle_serials import zodb_pickle, zodb_unpickle, handle_serials
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.Transaction import Transaction
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
SHORT_DELAY = 0.01 SHORT_DELAY = 0.01
...@@ -59,6 +59,7 @@ class ZODBClientThread(TestThread): ...@@ -59,6 +59,7 @@ class ZODBClientThread(TestThread):
def runtest(self): def runtest(self):
conn = self.db.open() conn = self.db.open()
conn.sync()
root = conn.root() root = conn.root()
d = self.get_thread_dict(root) d = self.get_thread_dict(root)
if d is None: if d is None:
...@@ -126,7 +127,7 @@ class StorageClientThread(TestThread): ...@@ -126,7 +127,7 @@ class StorageClientThread(TestThread):
def dostore(self, i): def dostore(self, i):
data = zodb_pickle(MinPO((self.getName(), i))) data = zodb_pickle(MinPO((self.getName(), i)))
t = Transaction() t = transaction.Transaction()
oid = self.oid() oid = self.oid()
self.pause() self.pause()
......
...@@ -127,7 +127,7 @@ class PackableStorageBase: ...@@ -127,7 +127,7 @@ class PackableStorageBase:
try: try:
self._storage.load(ZERO, '') self._storage.load(ZERO, '')
except KeyError: except KeyError:
from ZODB.Transaction import Transaction from transaction import Transaction
file = StringIO() file = StringIO()
p = cPickle.Pickler(file, 1) p = cPickle.Pickler(file, 1)
p.dump((PersistentMapping, None)) p.dump((PersistentMapping, None))
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# #
############################################################################## ##############################################################################
from ZODB.POSException import ReadOnlyError, Unsupported from ZODB.POSException import ReadOnlyError, Unsupported
from ZODB.Transaction import Transaction import transaction
class ReadOnlyStorage: class ReadOnlyStorage:
...@@ -47,7 +47,7 @@ class ReadOnlyStorage: ...@@ -47,7 +47,7 @@ class ReadOnlyStorage:
def checkWriteMethods(self): def checkWriteMethods(self):
self._make_readonly() self._make_readonly()
self.assertRaises(ReadOnlyError, self._storage.new_oid) self.assertRaises(ReadOnlyError, self._storage.new_oid)
t = Transaction() t = transaction.Transaction()
self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t) self.assertRaises(ReadOnlyError, self._storage.tpc_begin, t)
if self._storage.supportsVersions(): if self._storage.supportsVersions():
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
############################################################################## ##############################################################################
"""More recovery and iterator tests.""" """More recovery and iterator tests."""
from ZODB.Transaction import Transaction from transaction import Transaction
from ZODB.tests.IteratorStorage import IteratorDeepCompare from ZODB.tests.IteratorStorage import IteratorDeepCompare
from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze from ZODB.tests.StorageTestBase import MinPO, zodb_unpickle, snooze
from ZODB import DB from ZODB import DB
......
...@@ -16,9 +16,10 @@ ...@@ -16,9 +16,10 @@
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import \ from ZODB.tests.StorageTestBase import \
zodb_unpickle, zodb_pickle, snooze, handle_serials zodb_unpickle, zodb_pickle, snooze, handle_serials
from ZODB.Transaction import Transaction
from ZODB.utils import p64, u64 from ZODB.utils import p64, u64
import transaction
ZERO = '\0'*8 ZERO = '\0'*8
class RevisionStorage: class RevisionStorage:
...@@ -142,7 +143,7 @@ class RevisionStorage: ...@@ -142,7 +143,7 @@ class RevisionStorage:
oid = self._storage.new_oid() oid = self._storage.new_oid()
def helper(tid, revid, x): def helper(tid, revid, x):
data = zodb_pickle(MinPO(x)) data = zodb_pickle(MinPO(x))
t = Transaction() t = transaction.Transaction()
try: try:
self._storage.tpc_begin(t, p64(tid)) self._storage.tpc_begin(t, p64(tid))
r1 = self._storage.store(oid, revid, data, '', t) r1 = self._storage.store(oid, revid, data, '', t)
......
...@@ -26,9 +26,9 @@ import unittest ...@@ -26,9 +26,9 @@ import unittest
from cPickle import Pickler, Unpickler from cPickle import Pickler, Unpickler
from cStringIO import StringIO from cStringIO import StringIO
from ZODB.Transaction import Transaction import transaction
from ZODB.utils import u64
from ZODB.utils import u64
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
ZERO = '\0'*8 ZERO = '\0'*8
...@@ -184,7 +184,7 @@ class StorageTestBase(unittest.TestCase): ...@@ -184,7 +184,7 @@ class StorageTestBase(unittest.TestCase):
if version is None: if version is None:
version = '' version = ''
# Begin the transaction # Begin the transaction
t = Transaction() t = transaction.Transaction()
if user is not None: if user is not None:
t.user = user t.user = user
if description is not None: if description is not None:
...@@ -211,7 +211,7 @@ class StorageTestBase(unittest.TestCase): ...@@ -211,7 +211,7 @@ class StorageTestBase(unittest.TestCase):
def _undo(self, tid, expected_oids=None, note=None): def _undo(self, tid, expected_oids=None, note=None):
# Undo a tid that affects a single object (oid). # Undo a tid that affects a single object (oid).
# XXX This is very specialized # XXX This is very specialized
t = Transaction() t = transaction.Transaction()
t.note(note or "undo") t.note(note or "undo")
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
tid, oids = self._storage.undo(tid, t) tid, oids = self._storage.undo(tid, t)
...@@ -224,7 +224,7 @@ class StorageTestBase(unittest.TestCase): ...@@ -224,7 +224,7 @@ class StorageTestBase(unittest.TestCase):
return self._storage.lastTransaction() return self._storage.lastTransaction()
def _commitVersion(self, src, dst): def _commitVersion(self, src, dst):
t = Transaction() t = transaction.Transaction()
t.note("commit %r to %r" % (src, dst)) t.note("commit %r to %r" % (src, dst))
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
tid, oids = self._storage.commitVersion(src, dst, t) tid, oids = self._storage.commitVersion(src, dst, t)
...@@ -233,7 +233,7 @@ class StorageTestBase(unittest.TestCase): ...@@ -233,7 +233,7 @@ class StorageTestBase(unittest.TestCase):
return oids return oids
def _abortVersion(self, ver): def _abortVersion(self, ver):
t = Transaction() t = transaction.Transaction()
t.note("abort %r" % ver) t.note("abort %r" % ver)
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
tid, oids = self._storage.abortVersion(ver, t) tid, oids = self._storage.abortVersion(ver, t)
......
...@@ -62,7 +62,7 @@ tested? Is it a general restriction? ...@@ -62,7 +62,7 @@ tested? Is it a general restriction?
""" """
from ZODB.Transaction import Transaction from transaction import Transaction
from ZODB.POSException import StorageTransactionError from ZODB.POSException import StorageTransactionError
VERSION = "testversion" VERSION = "testversion"
......
...@@ -18,13 +18,15 @@ Any storage that supports undo() must pass these tests. ...@@ -18,13 +18,15 @@ Any storage that supports undo() must pass these tests.
import time import time
import types import types
from persistent import Persistent
from transaction import Transaction
from ZODB import POSException from ZODB import POSException
from ZODB.Transaction import Transaction
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.utils import p64 from ZODB.utils import p64
from ZODB import DB from ZODB import DB
from persistent import Persistent
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle from ZODB.tests.StorageTestBase import zodb_pickle, zodb_unpickle
......
...@@ -16,8 +16,9 @@ ...@@ -16,8 +16,9 @@
import time import time
import transaction
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle from ZODB.tests.StorageTestBase import zodb_unpickle
...@@ -114,7 +115,7 @@ class TransactionalUndoVersionStorage: ...@@ -114,7 +115,7 @@ class TransactionalUndoVersionStorage:
version=version, description='version2') version=version, description='version2')
self._x_dostore(description='create2') self._x_dostore(description='create2')
t = Transaction() t = transaction.Transaction()
t.description = 'commit version' t.description = 'commit version'
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
self._storage.commitVersion(version, '', t) self._storage.commitVersion(version, '', t)
......
...@@ -18,9 +18,10 @@ Any storage that supports versions should be able to pass all these tests. ...@@ -18,9 +18,10 @@ Any storage that supports versions should be able to pass all these tests.
import time import time
from transaction import Transaction
from ZODB import POSException from ZODB import POSException
from ZODB.serialize import referencesf from ZODB.serialize import referencesf
from ZODB.Transaction import Transaction
from ZODB.tests.MinPO import MinPO from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle, snooze from ZODB.tests.StorageTestBase import zodb_unpickle, snooze
from ZODB import DB from ZODB import DB
......
...@@ -483,8 +483,6 @@ class StubStorage: ...@@ -483,8 +483,6 @@ class StubStorage:
appended to self._finished. appended to self._finished.
""" """
sortKey = 'StubStorage sortKey'
# internal # internal
_oid = 1 _oid = 1
_transaction = None _transaction = None
...@@ -502,6 +500,9 @@ class StubStorage: ...@@ -502,6 +500,9 @@ class StubStorage:
self._oid += 1 self._oid += 1
return oid return oid
def sortKey(self):
return 'StubStorage sortKey'
def tpc_begin(self, transaction): def tpc_begin(self, transaction):
if transaction is None: if transaction is None:
raise TypeError('transaction may not be None') raise TypeError('transaction may not be None')
......
...@@ -11,9 +11,9 @@ ...@@ -11,9 +11,9 @@
# FOR A PARTICULAR PURPOSE. # FOR A PARTICULAR PURPOSE.
# #
############################################################################## ##############################################################################
import ZODB.FileStorage
import os, unittest import os, unittest
from ZODB.Transaction import Transaction import transaction
import ZODB.FileStorage
from ZODB import POSException from ZODB import POSException
from ZODB.tests import StorageTestBase, BasicStorage, \ from ZODB.tests import StorageTestBase, BasicStorage, \
...@@ -195,7 +195,7 @@ class SlowFileStorageTest(BaseFileStorageTests): ...@@ -195,7 +195,7 @@ class SlowFileStorageTest(BaseFileStorageTests):
# every 8000 calls. Make sure it gets minimal coverage. # every 8000 calls. Make sure it gets minimal coverage.
oids = [[self._storage.new_oid(), None] for i in range(100)] oids = [[self._storage.new_oid(), None] for i in range(100)]
for i in range(100): for i in range(100):
t = Transaction() t = transaction.Transaction()
self._storage.tpc_begin(t) self._storage.tpc_begin(t)
for j in range(100): for j in range(100):
o = MinPO(j) o = MinPO(j)
......
...@@ -11,13 +11,7 @@ ...@@ -11,13 +11,7 @@
# FOR A PARTICULAR PURPOSE # FOR A PARTICULAR PURPOSE
# #
############################################################################## ##############################################################################
"""Test tranasction behavior for variety of cases.
"""
Revision information:
$Id: testTransaction.py,v 1.19 2004/02/27 00:31:55 faassen Exp $
"""
"""
I wrote these unittests to investigate some odd transaction I wrote these unittests to investigate some odd transaction
behavior when doing unittests of integrating non sub transaction behavior when doing unittests of integrating non sub transaction
...@@ -42,26 +36,24 @@ TODO ...@@ -42,26 +36,24 @@ TODO
add in tests for objects which are modified multiple times, add in tests for objects which are modified multiple times,
for example an object that gets modified in multiple sub txns. for example an object that gets modified in multiple sub txns.
$Id: testTransaction.py,v 1.20 2004/04/01 03:56:57 jeremy Exp $
""" """
from types import TupleType
import unittest import unittest
import transaction
from ZODB import Transaction
class TransactionTests(unittest.TestCase): class TransactionTests(unittest.TestCase):
def setUp(self): def setUp(self):
self.orig_tm = transaction.manager
Transaction.hosed = 0 transaction.manager = transaction.TransactionManager()
self.sub1 = DataObject() self.sub1 = DataObject()
self.sub2 = DataObject() self.sub2 = DataObject()
self.sub3 = DataObject() self.sub3 = DataObject()
self.nosub1 = DataObject(nost=1) self.nosub1 = DataObject(nost=1)
def tearDown(self): def tearDown(self):
transaction.manager = self.orig_tm
Transaction.free_transaction()
# basic tests with two sub trans jars # basic tests with two sub trans jars
# really we only need one, so tests for # really we only need one, so tests for
...@@ -124,18 +116,12 @@ class TransactionTests(unittest.TestCase): ...@@ -124,18 +116,12 @@ class TransactionTests(unittest.TestCase):
assert self.sub1._p_jar.cabort_sub == 1 assert self.sub1._p_jar.cabort_sub == 1
def testMultipleSubTransactionCommitCommit(self): def testMultipleSubTransactionCommitCommit(self):
# add it
self.sub1.modify() self.sub1.modify()
get_transaction().commit(1) get_transaction().commit(1)
# add another
self.sub2.modify() self.sub2.modify()
# reset a flag on the original to test it again # reset a flag on the original to test it again
self.sub1.ctpc_finish = 0 self.sub1.ctpc_finish = 0
get_transaction().commit(1) get_transaction().commit(1)
# this is interesting.. we go through # this is interesting.. we go through
...@@ -150,7 +136,7 @@ class TransactionTests(unittest.TestCase): ...@@ -150,7 +136,7 @@ class TransactionTests(unittest.TestCase):
get_transaction().commit() get_transaction().commit()
# we did an implicit sub commit, is this impl artifiact? # we did an implicit sub commit, is this impl artifact?
assert self.sub3._p_jar.ccommit_sub == 1 assert self.sub3._p_jar.ccommit_sub == 1
assert self.sub1._p_jar.ctpc_finish > 1 assert self.sub1._p_jar.ctpc_finish > 1
...@@ -350,7 +336,6 @@ class TransactionTests(unittest.TestCase): ...@@ -350,7 +336,6 @@ class TransactionTests(unittest.TestCase):
assert self.nosub1._p_jar.ctpc_finish == 0 assert self.nosub1._p_jar.ctpc_finish == 0
assert self.nosub1._p_jar.ccommit == 1 assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1 assert self.nosub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
def testExceptionInTpcVote(self): def testExceptionInTpcVote(self):
...@@ -367,7 +352,6 @@ class TransactionTests(unittest.TestCase): ...@@ -367,7 +352,6 @@ class TransactionTests(unittest.TestCase):
assert self.nosub1._p_jar.ccommit == 1 assert self.nosub1._p_jar.ccommit == 1
assert self.nosub1._p_jar.ctpc_abort == 1 assert self.nosub1._p_jar.ctpc_abort == 1
assert self.sub1._p_jar.ctpc_abort == 1 assert self.sub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
def testExceptionInTpcBegin(self): def testExceptionInTpcBegin(self):
""" """
...@@ -406,29 +390,18 @@ class TransactionTests(unittest.TestCase): ...@@ -406,29 +390,18 @@ class TransactionTests(unittest.TestCase):
except TestTxnException: pass except TestTxnException: pass
assert self.nosub1._p_jar.ctpc_abort == 1 assert self.nosub1._p_jar.ctpc_abort == 1
assert Transaction.hosed == 0
### More Failure modes... ### More Failure modes...
# now we mix in some sub transactions # now we mix in some sub transactions
### ###
def testExceptionInSubCommitSub(self): def testExceptionInSubCommitSub(self):
""" # It's harder than normal to verify test results, because
this tests exhibits some odd behavior, # the subtransaction jars are stored in a dictionary. The
nothing thats technically incorrect... # order in which jars are processed depends on the order
# they come out of the dictionary.
basically it seems non deterministic, even
stranger the behavior seems dependent on what
values i test after the fact... very odd,
almost relativistic.
in-retrospect this is from the fact that
dictionaries are used to store jars at some point
"""
self.sub1.modify() self.sub1.modify()
get_transaction().commit(1) get_transaction().commit(1)
self.nosub1.modify() self.nosub1.modify()
...@@ -442,24 +415,30 @@ class TransactionTests(unittest.TestCase): ...@@ -442,24 +415,30 @@ class TransactionTests(unittest.TestCase):
try: try:
get_transaction().commit() get_transaction().commit()
except TestTxnException: pass except TestTxnException:
pass
# odd this doesn't seem to be entirely deterministic..
if self.sub1._p_jar.ccommit_sub: if self.sub1._p_jar.ccommit_sub:
assert self.sub1._p_jar.ctpc_abort == 1 self.assertEqual(self.sub1._p_jar.ctpc_abort, 1)
else: else:
assert self.sub1._p_jar.cabort_sub == 1 self.assertEqual(self.sub1._p_jar.cabort_sub, 1)
self.assertEqual(self.sub2._p_jar.ctpc_abort, 1)
self.assertEqual(self.nosub1._p_jar.ctpc_abort, 1)
if self.sub3._p_jar.ccommit_sub: if self.sub3._p_jar.ccommit_sub:
assert self.sub3._p_jar.ctpc_abort == 1 self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
else: else:
assert self.sub3._p_jar.cabort_sub == 1 self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
assert self.sub2._p_jar.ctpc_abort == 1
assert self.nosub1._p_jar.ctpc_abort == 1
def testExceptionInSubAbortSub(self): def testExceptionInSubAbortSub(self):
# This test has two errors. When commit_sub() is called on
# sub1, it will fail. If sub1 is handled first, it will raise
# an except and abort_sub() will be called on sub2. If sub2
# is handled first, then commit_sub() will fail after sub2 has
# already begun its top-level transaction and tpc_abort() will
# be called.
self.sub1._p_jar = SubTransactionJar(errors='commit_sub') self.sub1._p_jar = SubTransactionJar(errors='commit_sub')
self.sub1.modify(nojar=1) self.sub1.modify(nojar=1)
get_transaction().commit(1) get_transaction().commit(1)
...@@ -482,51 +461,47 @@ class TransactionTests(unittest.TestCase): ...@@ -482,51 +461,47 @@ class TransactionTests(unittest.TestCase):
# called, then tpc_abort() should be called to abort the # called, then tpc_abort() should be called to abort the
# actual transaction. If not, then calling abort_sub() is # actual transaction. If not, then calling abort_sub() is
# sufficient. # sufficient.
if self.sub3._p_jar.ccommit_sub == 1: if self.sub3._p_jar.ccommit_sub:
self.assertEqual(self.sub3._p_jar.ctpc_abort, 1) self.assertEqual(self.sub3._p_jar.ctpc_abort, 1)
else: else:
self.assertEqual(self.sub3._p_jar.cabort_sub, 1) self.assertEqual(self.sub3._p_jar.cabort_sub, 1)
# last test, check the hosing mechanism # last test, check the hosing mechanism
def testHoserStoppage(self): ## def testHoserStoppage(self):
# XXX We should consult ZConfig to decide whether we can get into a ## # It's hard to test the "hosed" state of the database, where
# hosed state or not. ## # hosed means that a failure occurred in the second phase of
return ## # the two phase commit. It's hard because the database can
## # recover from such an error if it occurs during the very first
## # tpc_finish() call of the second phase.
# It's hard to test the "hosed" state of the database, where ## for obj in self.sub1, self.sub2:
# hosed means that a failure occurred in the second phase of ## j = HoserJar(errors='tpc_finish')
# the two phase commit. It's hard because the database can ## j.reset()
# recover from such an error if it occurs during the very first ## obj._p_jar = j
# tpc_finish() call of the second phase. ## obj.modify(nojar=1)
for obj in self.sub1, self.sub2: ## try:
j = HoserJar(errors='tpc_finish') ## get_transaction().commit()
j.reset() ## except TestTxnException:
obj._p_jar = j ## pass
obj.modify(nojar=1)
try: ## self.assert_(Transaction.hosed)
get_transaction().commit()
except TestTxnException:
pass
self.assert_(Transaction.hosed) ## self.sub2.modify()
self.sub2.modify() ## try:
## get_transaction().commit()
try: ## except Transaction.POSException.TransactionError:
get_transaction().commit() ## pass
except Transaction.POSException.TransactionError: ## else:
pass ## self.fail("Hosed Application didn't stop commits")
else:
self.fail("Hosed Application didn't stop commits")
class DataObject: class DataObject:
def __init__(self, nost=0): def __init__(self, nost=0):
self.nost= nost self.nost = nost
self._p_jar = None self._p_jar = None
def modify(self, nojar=0, tracing=0): def modify(self, nojar=0, tracing=0):
...@@ -543,7 +518,7 @@ class TestTxnException(Exception): ...@@ -543,7 +518,7 @@ class TestTxnException(Exception):
class BasicJar: class BasicJar:
def __init__(self, errors=(), tracing=0): def __init__(self, errors=(), tracing=0):
if not isinstance(errors, TupleType): if not isinstance(errors, tuple):
errors = errors, errors = errors,
self.errors = errors self.errors = errors
self.tracing = tracing self.tracing = tracing
...@@ -557,7 +532,12 @@ class BasicJar: ...@@ -557,7 +532,12 @@ class BasicJar:
self.ccommit_sub = 0 self.ccommit_sub = 0
def __repr__(self): def __repr__(self):
return "<jar %X %s>" % (id(self), self.errors) return "<%s %X %s>" % (self.__class__.__name__, id(self), self.errors)
def sortKey(self):
# All these jars use the same sort key, and Python's list.sort()
# is stable. These two
return self.__class__.__name__
def check(self, method): def check(self, method):
if self.tracing: if self.tracing:
...@@ -637,17 +617,17 @@ def test_join(): ...@@ -637,17 +617,17 @@ def test_join():
transaction.interfaces.IDataManager. transaction.interfaces.IDataManager.
>>> from ZODB.tests.sampledm import DataManager >>> from ZODB.tests.sampledm import DataManager
>>> from ZODB.Transaction import DataManagerAdapter >>> from transaction._transaction import DataManagerAdapter
>>> t = Transaction.Transaction() >>> t = transaction.Transaction()
>>> dm = DataManager() >>> dm = DataManager()
>>> t.join(dm) >>> t.join(dm)
The end result is that a data manager adapter is one of the The end result is that a data manager adapter is one of the
transaction's objects: transaction's objects:
>>> isinstance(t._objects[0], DataManagerAdapter) >>> isinstance(t._resources[0], DataManagerAdapter)
True True
>>> t._objects[0]._datamanager is dm >>> t._resources[0]._datamanager is dm
True True
""" """
......
...@@ -16,8 +16,10 @@ import unittest ...@@ -16,8 +16,10 @@ import unittest
import ZODB import ZODB
import ZODB.FileStorage import ZODB.FileStorage
from ZODB.POSException import ReadConflictError, ConflictError from ZODB.POSException import ReadConflictError, ConflictError
from persistent import Persistent from persistent import Persistent
from persistent.mapping import PersistentMapping from persistent.mapping import PersistentMapping
import transaction
class P(Persistent): class P(Persistent):
pass pass
...@@ -54,13 +56,22 @@ class ZODBTests(unittest.TestCase): ...@@ -54,13 +56,22 @@ class ZODBTests(unittest.TestCase):
self._db.close() self._db.close()
self._storage.cleanup() self._storage.cleanup()
def checkExportImport(self, abort_it=0, dup_name='test_duplicate'): def checkExportImport(self, abort_it=False):
self.populate() self.populate()
get_transaction().begin()
get_transaction().note('duplication')
# Duplicate the 'test' object.
conn = self._db.open() conn = self._db.open()
try: try:
self.duplicate(conn, abort_it)
finally:
conn.close()
conn = self._db.open()
try:
self.verify(conn, abort_it)
finally:
conn.close()
def duplicate(self, conn, abort_it):
get_transaction().begin()
get_transaction().note('duplication')
root = conn.root() root = conn.root()
ob = root['test'] ob = root['test']
assert len(ob) > 10, 'Insufficient test data' assert len(ob) > 10, 'Insufficient test data'
...@@ -71,7 +82,8 @@ class ZODBTests(unittest.TestCase): ...@@ -71,7 +82,8 @@ class ZODBTests(unittest.TestCase):
assert f.tell() > 0, 'Did not export correctly' assert f.tell() > 0, 'Did not export correctly'
f.seek(0) f.seek(0)
new_ob = ob._p_jar.importFile(f) new_ob = ob._p_jar.importFile(f)
root[dup_name] = new_ob self.assertEqual(new_ob, ob)
root['dup'] = new_ob
f.close() f.close()
if abort_it: if abort_it:
get_transaction().abort() get_transaction().abort()
...@@ -80,16 +92,13 @@ class ZODBTests(unittest.TestCase): ...@@ -80,16 +92,13 @@ class ZODBTests(unittest.TestCase):
except: except:
get_transaction().abort() get_transaction().abort()
raise raise
finally:
conn.close() def verify(self, conn, abort_it):
get_transaction().begin() get_transaction().begin()
# Verify the duplicate.
conn = self._db.open()
try:
root = conn.root() root = conn.root()
ob = root['test'] ob = root['test']
try: try:
ob2 = root[dup_name] ob2 = root['dup']
except KeyError: except KeyError:
if abort_it: if abort_it:
# Passed the test. # Passed the test.
...@@ -97,17 +106,16 @@ class ZODBTests(unittest.TestCase): ...@@ -97,17 +106,16 @@ class ZODBTests(unittest.TestCase):
else: else:
raise raise
else: else:
if abort_it: self.failUnless(not abort_it, 'Did not abort duplication')
assert 0, 'Did not abort duplication'
l1 = list(ob.items()) l1 = list(ob.items())
l1.sort() l1.sort()
l2 = list(ob2.items()) l2 = list(ob2.items())
l2.sort() l2.sort()
l1 = map(lambda (k, v): (k, v[0]), l1) l1 = map(lambda (k, v): (k, v[0]), l1)
l2 = map(lambda (k, v): (k, v[0]), l2) l2 = map(lambda (k, v): (k, v[0]), l2)
assert l1 == l2, 'Duplicate did not match' self.assertEqual(l1, l2)
assert ob._p_oid != ob2._p_oid, 'Did not duplicate' self.assert_(ob._p_oid != ob2._p_oid)
assert ob._p_jar == ob2._p_jar, 'Not same connection' self.assertEqual(ob._p_jar, ob2._p_jar)
oids = {} oids = {}
for v in ob.values(): for v in ob.values():
oids[v._p_oid] = 1 oids[v._p_oid] = 1
...@@ -115,11 +123,9 @@ class ZODBTests(unittest.TestCase): ...@@ -115,11 +123,9 @@ class ZODBTests(unittest.TestCase):
assert not oids.has_key(v._p_oid), ( assert not oids.has_key(v._p_oid), (
'Did not fully separate duplicate from original') 'Did not fully separate duplicate from original')
get_transaction().commit() get_transaction().commit()
finally:
conn.close()
def checkExportImportAborted(self): def checkExportImportAborted(self):
self.checkExportImport(abort_it=1, dup_name='test_duplicate_aborted') self.checkExportImport(abort_it=True)
def checkVersionOnly(self): def checkVersionOnly(self):
# Make sure the changes to make empty transactions a no-op # Make sure the changes to make empty transactions a no-op
...@@ -160,6 +166,44 @@ class ZODBTests(unittest.TestCase): ...@@ -160,6 +166,44 @@ class ZODBTests(unittest.TestCase):
conn._setDB(self._db) # simulate the connection being reopened conn._setDB(self._db) # simulate the connection being reopened
self.assertEqual(len(conn._cache), 0) self.assertEqual(len(conn._cache), 0)
def checkExplicitTransactionManager(self):
# Test of transactions that apply to only the connection,
# not the thread.
tm1 = transaction.TransactionManager()
conn1 = self._db.open(txn_mgr=tm1)
tm2 = transaction.TransactionManager()
conn2 = self._db.open(txn_mgr=tm2)
try:
r1 = conn1.root()
r2 = conn2.root()
if r1.has_key('item'):
del r1['item']
tm1.get().commit()
r1.get('item')
r2.get('item')
r1['item'] = 1
tm1.get().commit()
self.assertEqual(r1['item'], 1)
# r2 has not seen a transaction boundary,
# so it should be unchanged.
self.assertEqual(r2.get('item'), None)
conn2.sync()
# Now r2 is updated.
self.assertEqual(r2['item'], 1)
# Now, for good measure, send an update in the other direction.
r2['item'] = 2
tm2.get().commit()
self.assertEqual(r1['item'], 1)
self.assertEqual(r2['item'], 2)
conn1.sync()
conn2.sync()
self.assertEqual(r1['item'], 2)
self.assertEqual(r2['item'], 2)
finally:
conn1.close()
conn2.close()
def checkLocalTransactions(self): def checkLocalTransactions(self):
# Test of transactions that apply to only the connection, # Test of transactions that apply to only the connection,
# not the thread. # not the thread.
......
...@@ -41,8 +41,9 @@ We will use two different connections with the experimental ...@@ -41,8 +41,9 @@ We will use two different connections with the experimental
setLocalTransaction() method to make sure that the connections act setLocalTransaction() method to make sure that the connections act
independently, even though they'll be run from a single thread. independently, even though they'll be run from a single thread.
>>> cn1 = db.open() >>> import transaction
>>> txn1 = cn1.setLocalTransaction() >>> tm1 = transaction.TransactionManager()
>>> cn1 = db.open(txn_mgr=tm1)
The test will just use some MinPO objects. The next few lines just The test will just use some MinPO objects. The next few lines just
setup an initial database state. setup an initial database state.
...@@ -51,12 +52,12 @@ setup an initial database state. ...@@ -51,12 +52,12 @@ setup an initial database state.
>>> r = cn1.root() >>> r = cn1.root()
>>> r["a"] = MinPO(1) >>> r["a"] = MinPO(1)
>>> r["b"] = MinPO(1) >>> r["b"] = MinPO(1)
>>> txn1.commit() >>> tm1.get().commit()
Now open a second connection. Now open a second connection.
>>> cn2 = db.open() >>> tm2 = transaction.TransactionManager()
>>> txn2 = cn2.setLocalTransaction() >>> cn2 = db.open(txn_mgr=tm2)
Connection high-water mark Connection high-water mark
-------------------------- --------------------------
...@@ -104,7 +105,7 @@ will modify "a." The other transaction will then modify "b" and commit. ...@@ -104,7 +105,7 @@ will modify "a." The other transaction will then modify "b" and commit.
>>> r1 = cn1.root() >>> r1 = cn1.root()
>>> r1["a"].value = 2 >>> r1["a"].value = 2
>>> cn1.getTransaction().commit() >>> tm1.get().commit()
>>> txn = db.lastTransaction() >>> txn = db.lastTransaction()
The second connection has its high-water mark set now. The second connection has its high-water mark set now.
...@@ -141,7 +142,7 @@ It's possible to modify "a", but we get a conflict error when we ...@@ -141,7 +142,7 @@ It's possible to modify "a", but we get a conflict error when we
commit the transaction. commit the transaction.
>>> r2["a"].value = 3 >>> r2["a"].value = 3
>>> txn2.commit() >>> tm2.get().commit()
Traceback (most recent call last): Traceback (most recent call last):
... ...
ConflictError: database conflict error (oid 0000000000000001, class ZODB.tests.MinPO.MinPO) ConflictError: database conflict error (oid 0000000000000001, class ZODB.tests.MinPO.MinPO)
...@@ -155,9 +156,7 @@ None ...@@ -155,9 +156,7 @@ None
>>> r1 = cn1.root() >>> r1 = cn1.root()
>>> r1["a"].value = 3 >>> r1["a"].value = 3
>>> txn1 is cn1.getTransaction() >>> tm1.get().commit()
True
>>> cn1.getTransaction().commit()
>>> txn = db.lastTransaction() >>> txn = db.lastTransaction()
>>> cn2._txn_time == txn >>> cn2._txn_time == txn
True True
...@@ -165,7 +164,7 @@ True ...@@ -165,7 +164,7 @@ True
>>> r2["b"].value = r2["a"].value + 1 >>> r2["b"].value = r2["a"].value + 1
>>> r2["b"].value >>> r2["b"].value
3 3
>>> txn2.commit() >>> tm2.get().commit()
>>> print cn2._txn_time >>> print cn2._txn_time
None None
...@@ -185,7 +184,7 @@ First get the database back in an initial state. ...@@ -185,7 +184,7 @@ First get the database back in an initial state.
>>> cn1.sync() >>> cn1.sync()
>>> r1["a"].value = 0 >>> r1["a"].value = 0
>>> r1["b"].value = 0 >>> r1["b"].value = 0
>>> cn1.getTransaction().commit() >>> tm1.get().commit()
>>> cn2.sync() >>> cn2.sync()
>>> r2["a"].value >>> r2["a"].value
...@@ -206,7 +205,7 @@ should all have the same effect on non-current objects in cache. ...@@ -206,7 +205,7 @@ should all have the same effect on non-current objects in cache.
... cn1.sync() ... cn1.sync()
... r1["a"].value = 0 ... r1["a"].value = 0
... r1["b"].value = 0 ... r1["b"].value = 0
... cn1.getTransaction().commit() ... tm1.get().commit()
... cn2.sync() ... cn2.sync()
... r2["b"].value = 1 ... r2["b"].value = 1
... cn2.getTransaction().commit() ... cn2.getTransaction().commit()
...@@ -217,7 +216,7 @@ should all have the same effect on non-current objects in cache. ...@@ -217,7 +216,7 @@ should all have the same effect on non-current objects in cache.
>>> r1["b"].value >>> r1["b"].value
0 0
>>> r1["a"].value = 1 >>> r1["a"].value = 1
>>> cn1.getTransaction().commit() >>> tm1.get().commit()
>>> r1["b"]._p_state >>> r1["b"]._p_state
-1 -1
...@@ -280,14 +279,13 @@ non-current revision to load. ...@@ -280,14 +279,13 @@ non-current revision to load.
>>> ts = TestStorage() >>> ts = TestStorage()
>>> db = DB(ts) >>> db = DB(ts)
>>> cn1 = db.open() >>> cn1 = db.open(txn_mgr=tm1)
>>> txn1 = cn1.setLocalTransaction()
>>> r1 = cn1.root() >>> r1 = cn1.root()
>>> r1["a"] = MinPO(0) >>> r1["a"] = MinPO(0)
>>> r1["b"] = MinPO(0) >>> r1["b"] = MinPO(0)
>>> cn1.getTransaction().commit() >>> tm1.get().commit()
>>> r1["b"].value = 1 >>> r1["b"].value = 1
>>> cn1.getTransaction().commit() >>> tm1.get().commit()
>>> cn1.cacheMinimize() # makes everything in cache a ghost >>> cn1.cacheMinimize() # makes everything in cache a ghost
>>> oid = r1["b"]._p_oid >>> oid = r1["b"]._p_oid
...@@ -318,12 +316,11 @@ activate "b" will result in a ReadConflictError. ...@@ -318,12 +316,11 @@ activate "b" will result in a ReadConflictError.
>>> ts = TestStorage() >>> ts = TestStorage()
>>> db = DB(ts) >>> db = DB(ts)
>>> cn1 = db.open() >>> cn1 = db.open(txn_mgr=tm1)
>>> txn1 = cn1.setLocalTransaction()
>>> r1 = cn1.root() >>> r1 = cn1.root()
>>> r1["a"] = MinPO(0) >>> r1["a"] = MinPO(0)
>>> r1["b"] = MinPO(0) >>> r1["b"] = MinPO(0)
>>> cn1.getTransaction().commit() >>> tm1.get().commit()
>>> cn1.cacheMinimize() # makes everything in cache a ghost >>> cn1.cacheMinimize() # makes everything in cache a ghost
>>> oid = r1["b"]._p_oid >>> oid = r1["b"]._p_oid
......
...@@ -11,3 +11,4 @@ is syntactically simple, but semantically complex. The semantics ...@@ -11,3 +11,4 @@ is syntactically simple, but semantically complex. The semantics
were not easy to express in the interface. This could probably use were not easy to express in the interface. This could probably use
more work. The semantics are presented in detail through examples of more work. The semantics are presented in detail through examples of
a sample data manager in transaction.tests.test_SampleDataManager. a sample data manager in transaction.tests.test_SampleDataManager.
...@@ -12,5 +12,22 @@ ...@@ -12,5 +12,22 @@
# #
############################################################################ ############################################################################
from ZODB.Transaction import get_transaction from transaction._transaction import Transaction
from transaction._manager import TransactionManager, ThreadTransactionManager
manager = ThreadTransactionManager()
def get():
return manager.get()
def begin():
return manager.begin()
def commit():
manager.get().commit()
def abort():
manager.get().abort()
# XXX Issue deprecation warning if this variant is used?
get_transaction = get
############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
"""A TransactionManager controls transaction boundaries.
It coordinates application code and resource managers, so that they
are associated with the right transaction.
"""
import thread
from transaction._transaction import Transaction
class TransactionManager(object):
def __init__(self):
self._txn = None
self._synchs = []
def begin(self):
if self._txn is not None:
self._txn.abort()
self._txn = Transaction(self._synchs, self)
return self._txn
def get(self):
if self._txn is None:
self._txn = Transaction(self._synchs, self)
return self._txn
def free(self, txn):
assert txn is self._txn
self._txn = None
def registerSynch(self, synch):
self.synchs.append(synch)
def unregisterSynch(self, synch):
self._synchs.remove(synch)
class ThreadTransactionManager(object):
"""Thread-aware transaction manager.
Each thread is associated with a unique transaction.
"""
def __init__(self):
# _threads maps thread ids to transactions
self._txns = {}
# _synchs maps a thread id to a list of registered synchronizers.
# The list is passed to the Transaction constructor, because
# it needs to call the synchronizers when it commits.
self._synchs = {}
def begin(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is not None:
txn.abort()
txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
return txn
def get(self):
tid = thread.get_ident()
txn = self._txns.get(tid)
if txn is None:
txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
return txn
def free(self, txn):
tid = thread.get_ident()
assert txn is self._txns.get(tid)
del self._txns[tid]
def registerSynch(self, synch):
tid = thread.get_ident()
L = self._synchs.setdefault(tid, [])
L.append(synch)
def unregisterSynch(self, synch):
tid = thread.get_ident()
L = self._synchs.get(tid)
L.remove(synch)
This diff is collapsed.
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test backwards compatibility for resource managers using register().
The transaction package supports several different APIs for resource
managers. The original ZODB3 API was implemented by ZODB.Connection.
The Connection passed persistent objects to a Transaction's register()
method. It's possible that third-party code also used this API, hence
these tests that the code that adapts the old interface to the current
API works.
These tests use a TestConnection object that implements the old API.
They check that the right methods are called and in roughly the right
order.
Common cases
------------
First, check that a basic transaction commit works.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(Object())
>>> cn.register(Object())
>>> transaction.commit()
>>> len(cn.committed)
3
>>> len(cn.aborted)
0
>>> cn.calls
['begin', 'vote', 'finish']
Second, check that a basic transaction abort works. If the
application calls abort(), then the transaction never gets into the
two-phase commit. It just aborts each object.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(Object())
>>> cn.register(Object())
>>> transaction.abort()
>>> len(cn.committed)
0
>>> len(cn.aborted)
3
>>> cn.calls
[]
Error handling
--------------
The tricky part of the implementation is recovering from an error that
occurs during the two-phase commit. We override the commit() and
abort() methods of Object to cause errors during commit.
Note that the implementation uses lists internally, so that objects
are committed in the order they are registered. (In the presence of
multiple resource managers, objects from a single resource manager are
committed in order. The order of resource managers depends on
sortKey().) I'm not sure if this is an accident of the implementation
or a feature that should be supported by any implementation.
>>> cn = TestConnection()
>>> cn.register(Object())
>>> cn.register(CommitError())
>>> cn.register(Object())
>>> transaction.commit()
Traceback (most recent call last):
...
RuntimeError: commit
>>> len(cn.committed)
1
>>> len(cn.aborted)
2
"""
import transaction
class Object(object):
def commit(self):
pass
def abort(self):
pass
class CommitError(Object):
def commit(self):
raise RuntimeError("commit")
class AbortError(Object):
def abort(self):
raise RuntimeError("abort")
class BothError(CommitError, AbortError):
pass
class TestConnection:
def __init__(self):
self.committed = []
self.aborted = []
self.calls = []
def register(self, obj):
obj._p_jar = self
transaction.get().register(obj)
def sortKey(self):
return str(id(self))
def tpc_begin(self, txn, sub):
self.calls.append("begin")
def tpc_vote(self, txn):
self.calls.append("vote")
def tpc_finish(self, txn):
self.calls.append("finish")
def tpc_abort(self, txn):
self.calls.append("abort")
def commit(self, obj, txn):
obj.commit()
self.committed.append(obj)
def abort(self, obj, txn):
obj.abort()
self.aborted.append(obj)
import doctest
def test_suite():
return doctest.DocTestSuite()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment