Commit dd80d5c9 authored by Jim Fulton's avatar Jim Fulton

Fixed a bug that could cause database consistency problems

in cases where a transaction is aborted due to a storage error
(e.g. ConflictError) during two-phase commit and new objects
have been stored. The new objects were left in a state where they
incorrectly appeared to have been saved and be up to date.

Also removed unnecessary invalidation of objects in other connections
when commiting sub-transactions.
parent 962c3ca1
......@@ -84,8 +84,8 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.43 2001/01/15 18:49:49 jim Exp $"""
__version__='$Revision: 1.43 $'[11:-2]
$Id: Connection.py,v 1.44 2001/01/18 18:21:19 jim Exp $"""
__version__='$Revision: 1.44 $'[11:-2]
from cPickleCache import PickleCache
from POSException import ConflictError, ExportError
......@@ -260,9 +260,11 @@ class Connection(ExportImport.ExportImport):
oid=object._p_oid
invalid=self._invalid
if oid is None or object._p_jar is not self:
# new object
oid = self.new_oid()
object._p_jar=self
object._p_oid=oid
self._creating.append(oid)
elif object._p_changed:
if invalid(oid) or invalid(None): raise ConflictError, `oid`
......@@ -321,7 +323,14 @@ class Connection(ExportImport.ExportImport):
del stack[-1]
oid=object._p_oid
serial=getattr(object, '_p_serial', '\0\0\0\0\0\0\0\0')
if invalid(oid): raise ConflictError, `oid`
if serial == '\0\0\0\0\0\0\0\0':
# new object
self._creating.append(oid)
else:
#XXX We should never get here
if invalid(oid) or invalid(None): raise ConflictError, `oid`
self._invalidating.append(oid)
klass = object.__class__
if klass is ExtensionKlass:
......@@ -390,8 +399,12 @@ class Connection(ExportImport.ExportImport):
dest=self._version
get=self._cache.get
oids=src._index.keys()
# Copy invalidating and creating info from temporary storage:
invalidating=self._invalidating
invalidating[len(invalidating):]=oids
creating=self._creating
creating[len(creating):]=src._creating
for oid in oids:
data, serial = load(oid, src)
......@@ -408,12 +421,34 @@ class Connection(ExportImport.ExportImport):
def abort_sub(self, t):
"""Abort work done in subtransactions"""
tmp=self._tmp
if tmp is None: return
src=self._storage
self._tmp=None
self._storage=tmp
self._cache.invalidate(src._index.keys())
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
"""Dissown any objects newly saved in an uncommitted transaction.
"""
if creating is None:
creating=self._creating
self._creating=[]
cache=self._cache
cache_get=cache.get
for oid in creating:
o=cache_get(oid, None)
if o is not None:
del o._p_jar
del o._p_oid
del cache[oid]
#XXX
def db(self): return self._db
......@@ -520,11 +555,13 @@ class Connection(ExportImport.ExportImport):
cache=self._cache
cache.invalidate(self._invalidated)
cache.invalidate(self._invalidating)
self._invalidate_creating()
def tpc_begin(self, transaction, sub=None):
if self._invalid(None): # Some nitwit invalidated everything!
raise ConflictError, "transaction already invalidated"
self._invalidating=[]
self._creating=[]
if sub:
# Sub-transaction!
......@@ -559,15 +596,30 @@ class Connection(ExportImport.ExportImport):
# want another thread to be able to read any updated data
# until we've had a chance to send an invalidation message to
# all of the other connections!
self._storage.tpc_finish(transaction, self.tpc_finish_)
if self._tmp is not None:
# Commiting a subtransaction!
# There is no need to invalidate anything.
self._storage.tpc_finish(transaction, self._invalidate_sub)
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
self._storage.tpc_finish(transaction,
self._invalidate_invalidating)
self._cache.invalidate(self._invalidated)
self._incrgc() # This is a good time to do some GC
def tpc_finish_(self):
def _invalidate_invalidating(self):
invalidate=self._db.invalidate
for oid in self._invalidating: invalidate(oid, self)
def _invalidate_sub(self):
# There's no point in invalidating any objects in a subtransaction
#
# Because we may ultimately abort the containing transaction.
pass
def sync(self):
get_transaction().abort()
sync=getattr(self._storage, 'sync', 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment