Commit 3e3800a2 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge Standby-branch to trunk (mostly).

The Standby-branch was branched from the StandaloneZODB-1_0-branch,
which includes the BTrees-fsIndex code.  I didn't include that change
in the merge, but everything else.  Terse summary follows:

BaseStorage.py:
    Add read-only storage feature.
    Add TransactionRecord and DataRecord marker classes for iteration.
    Reformat some lines.

FileStorage.py:
    Add read-only storage feature.
    Greg Ward's ConflictError patch
    Reformat some lines.
    Add lastTransaction(), lastSerialno().
    Add bounds support to iterator().
    Use TransactionRecord and DataRecord.

Connection.py:
DemoStorage.py:
MappingStorage.py:
    Greg Ward's ConflictError patch

POSException.py:
    Greg Ward's ConflictError patch
    Add ReadOnlyError.
parent 0431ca7b
......@@ -14,7 +14,7 @@
"""
# Do this portably in the face of checking out with -kv
import string
__version__ = string.split('$Revision: 1.16 $')[-2:][0]
__version__ = string.split('$Revision: 1.17 $')[-2:][0]
import ThreadLock, bpthread
import time, UndoLogCompatible
......@@ -26,6 +26,7 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
_transaction=None # Transaction that is being committed
_serial=z64 # Transaction serial number
_tstatus=' ' # Transaction status, used for copying data
_is_read_only = 0
def __init__(self, name, base=None):
......@@ -42,8 +43,10 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
t=time.time()
t=self._ts=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
self._serial=`t`
if base is None: self._oid='\0\0\0\0\0\0\0\0'
else: self._oid=base._oid
if base is None:
self._oid='\0\0\0\0\0\0\0\0'
else:
self._oid=base._oid
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
......@@ -55,15 +58,24 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
raise POSException.StorageTransactionError(self, transaction)
return []
def close(self): pass
def close(self):
pass
def getName(self): return self.__name__
def getSize(self): return len(self)*300 # WAG!
def history(self, oid, version, length=1): pass
def getName(self):
return self.__name__
def getSize(self):
return len(self)*300 # WAG!
def history(self, oid, version, length=1):
pass
def modifiedInVersion(self, oid): return ''
def modifiedInVersion(self, oid):
return ''
def new_oid(self, last=None):
if self._is_read_only:
raise POSException.ReadOnlyError()
if last is None:
self._lock_acquire()
try:
......@@ -79,10 +91,17 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
if d < 255: return last[:-1]+chr(d+1)+'\0'*(8-len(last))
else: return self.new_oid(last[:-1])
def registerDB(self, db, limit): pass # we don't care
def supportsUndo(self): return 0
def supportsVersions(self): return 0
def registerDB(self, db, limit):
pass # we don't care
def isReadOnly(self):
return self._is_read_only
def supportsUndo(self):
return 0
def supportsVersions(self):
return 0
def tpc_abort(self, transaction):
self._lock_acquire()
......@@ -171,15 +190,22 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
pass
def undo(self, transaction_id):
if self._is_read_only:
raise POSException.ReadOnlyError()
raise POSException.UndoError, 'non-undoable transaction'
def undoLog(self, first, last, filter=None): return ()
def undoLog(self, first, last, filter=None):
return ()
def versionEmpty(self, version): return 1
def versionEmpty(self, version):
return 1
def versions(self, max=None): return ()
def versions(self, max=None):
return ()
def pack(self, t, referencesf): pass
def pack(self, t, referencesf):
if self._is_read_only:
raise POSException.ReadOnlyError()
def getSerial(self, oid):
self._lock_acquire()
......@@ -232,3 +258,9 @@ class BaseStorage(UndoLogCompatible.UndoLogCompatible):
self.tpc_vote(transaction)
self.tpc_finish(transaction)
class TransactionRecord:
"""Abstract base class for iterator protocol"""
class DataRecord:
"""Abstract base class for iterator protocol"""
......@@ -12,11 +12,11 @@
##############################################################################
"""Database connection support
$Id: Connection.py,v 1.61 2001/11/28 15:51:18 matt Exp $"""
__version__='$Revision: 1.61 $'[11:-2]
$Id: Connection.py,v 1.62 2002/01/17 17:34:33 jeremy Exp $"""
__version__='$Revision: 1.62 $'[11:-2]
from cPickleCache import PickleCache
from POSException import ConflictError
from POSException import ConflictError, ReadConflictError
from ExtensionClass import Base
import ExportImport, TmpStore
from zLOG import LOG, ERROR, BLATHER
......@@ -248,7 +248,7 @@ class Connection(ExportImport.ExportImport):
or
invalid(None)
):
raise ConflictError, `oid`
raise ConflictError(object=object)
self._invalidating.append(oid)
else:
......@@ -315,7 +315,7 @@ class Connection(ExportImport.ExportImport):
or
invalid(None)
):
raise ConflictError, `oid`
raise ConflictError(object=object)
self._invalidating.append(oid)
klass = object.__class__
......@@ -459,7 +459,7 @@ class Connection(ExportImport.ExportImport):
if invalid(oid) or invalid(None):
if not hasattr(object.__class__, '_p_independent'):
get_transaction().register(self)
raise ConflictError(`oid`, `object.__class__`)
raise ReadConflictError(object=object)
invalid=1
else:
invalid=0
......@@ -484,7 +484,7 @@ class Connection(ExportImport.ExportImport):
except KeyError: pass
else:
get_transaction().register(self)
raise ConflictError(`oid`, `object.__class__`)
raise ConflictError(object=object)
except ConflictError:
raise
......@@ -544,7 +544,7 @@ class Connection(ExportImport.ExportImport):
def tpc_begin(self, transaction, sub=None):
if self._invalid(None): # Some nitwit invalidated everything!
raise ConflictError, "transaction already invalidated"
raise ConflictError("transaction already invalidated")
self._invalidating=[]
self._creating=[]
......
......@@ -78,7 +78,7 @@ method::
and call it to monitor the storage.
"""
__version__='$Revision: 1.8 $'[11:-2]
__version__='$Revision: 1.9 $'[11:-2]
import base64, time, string
from ZODB import POSException, BaseStorage, utils
......@@ -244,7 +244,8 @@ class DemoStorage(BaseStorage.BaseStorage):
else:
nv=old
if serial != oserial: raise POSException.ConflictError
if serial != oserial:
raise POSException.ConflictError(serials=(oserial, serial))
serial=self._serial
r=[oid, serial, old, version and (version, nv) or None, data]
......
......@@ -114,20 +114,19 @@
# may have a back pointer to a version record or to a non-version
# record.
#
__version__='$Revision: 1.76 $'[11:-2]
__version__='$Revision: 1.77 $'[11:-2]
import struct, time, os, bpthread, string, base64, sys
import struct, time, os, string, base64, sys
from struct import pack, unpack
from cPickle import loads
import POSException
from POSException import UndoError
from TimeStamp import TimeStamp
from lock_file import lock_file
from utils import t32, p64, U64, cp
from zLOG import LOG, WARNING, ERROR, PANIC, register_subsystem
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC, register_subsystem
register_subsystem('ZODB FS')
import BaseStorage
from cPickle import Pickler, Unpickler
from cPickle import Pickler, Unpickler, loads
import ConflictResolution
try: from posix import fsync
......@@ -185,6 +184,7 @@ class FileStorage(BaseStorage.BaseStorage,
create = 1
if read_only:
self._is_read_only = 1
if create:
raise ValueError, "can\'t create a read-only file"
elif stop is not None:
......@@ -242,6 +242,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._file, file_name, index, vindex, tindex, stop,
read_only=read_only,
)
self._ltid = tid
self._ts = tid = TimeStamp(tid)
t = time.time()
......@@ -262,7 +263,8 @@ class FileStorage(BaseStorage.BaseStorage,
self._index_get=index.get
self._vindex_get=vindex.get
def __len__(self): return len(self._index)
def __len__(self):
return len(self._index)
def _newIndexes(self):
# hook to use something other than builtin dict
......@@ -389,13 +391,20 @@ class FileStorage(BaseStorage.BaseStorage,
def close(self):
self._file.close()
if hasattr(self,'_lock_file'): self._lock_file.close()
if self._tfile: self._tfile.close()
try: self._save_index()
except: pass # We don't care if this fails.
if hasattr(self,'_lock_file'):
self._lock_file.close()
if self._tfile:
self._tfile.close()
try:
self._save_index()
except:
# XXX should log the error, though
pass # We don't care if this fails.
def commitVersion(self, src, dest, transaction, abort=None):
# We are going to commit by simply storing back pointers.
if self._is_read_only:
raise POSException.ReadOnlyError()
if not (src and isinstance(src, StringType)
and isinstance(dest, StringType)):
raise POSException.VersionCommitError('Invalid source version')
......@@ -413,9 +422,8 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_acquire()
try:
file=self._file
read=file.read
seek=file.seek
read=self._file.read
seek=self._file.seek
tfile=self._tfile
write=tfile.write
tindex=self._tindex
......@@ -607,6 +615,8 @@ class FileStorage(BaseStorage.BaseStorage,
finally: self._lock_release()
def store(self, oid, serial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
......@@ -632,8 +642,8 @@ class FileStorage(BaseStorage.BaseStorage,
if serial != oserial:
data=self.tryToResolveConflict(oid, oserial, serial, data)
if not data:
raise POSException.ConflictError, (
serial, oserial)
raise POSException.ConflictError(oid=oid,
serials=(oserial, serial))
else:
oserial=serial
......@@ -672,13 +682,17 @@ class FileStorage(BaseStorage.BaseStorage,
finally:
self._lock_release()
def supportsUndo(self): return 1
def supportsVersions(self): return 1
def supportsUndo(self):
return 1
def supportsVersions(self):
return 1
def _clear_temp(self):
self._tindex.clear()
self._tvindex.clear()
self._tfile.seek(0)
if self._tfile is not None:
self._tfile.seek(0)
def _begin(self, tid, u, d, e):
self._thl=23+len(u)+len(d)+len(e)
......@@ -702,9 +716,9 @@ class FileStorage(BaseStorage.BaseStorage,
# We have to check lengths here because struct.pack
# doesn't raise an exception on overflow!
if luser > 65535: raise FileStorageError, 'user name too long'
if ldesc > 65535: raise FileStorageError, 'description too long'
if lext > 65535: raise FileStorageError, 'too much extension data'
if luser > 65535: raise FileStorageError('user name too long')
if ldesc > 65535: raise FileStorageError('description too long')
if lext > 65535: raise FileStorageError('too much extension data')
tlen=self._thl
pos=self._pos
......@@ -718,7 +732,7 @@ class FileStorage(BaseStorage.BaseStorage,
# suspect.
write(pack(
">8s" "8s" "c" "H" "H" "H"
,tid, stl, 'c', luser, ldesc, lext,
,tid, stl,'c', luser, ldesc, lext,
))
if user: write(user)
if desc: write(desc)
......@@ -754,6 +768,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._index.update(self._tindex)
self._vindex.update(self._tvindex)
self._ltid = tid
def _abort(self):
if self._nextpos:
......@@ -761,6 +776,8 @@ class FileStorage(BaseStorage.BaseStorage,
self._nextpos=0
def undo(self, transaction_id):
if self._is_read_only:
raise POSException.ReadOnlyError()
self._lock_acquire()
try:
self._clear_index()
......@@ -808,7 +825,8 @@ class FileStorage(BaseStorage.BaseStorage,
return t.keys()
finally: self._lock_release()
def supportsTransactionalUndo(self): return 1
def supportsTransactionalUndo(self):
return 1
def _undoDataInfo(self, oid, pos, tpos):
"""Return the serial, data pointer, data, and version for the oid
......@@ -930,7 +948,9 @@ class FileStorage(BaseStorage.BaseStorage,
# writing a file position rather than a pickle. Sometimes, we
# may do conflict resolution, in which case we actually copy
# new data that results from resolution.
if self._is_read_only:
raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
......@@ -1175,6 +1195,8 @@ class FileStorage(BaseStorage.BaseStorage,
the associated data are copied, since the old records are not copied.
"""
if self._is_read_only:
raise POSException.ReadOnlyError()
# Ugh, this seems long
packing=1 # are we in the packing phase (or the copy phase)
......@@ -1551,8 +1573,32 @@ class FileStorage(BaseStorage.BaseStorage,
self._packt=z64
_lock_release()
def iterator(self):
return FileIterator(self._file_name)
def iterator(self, start=None, stop=None):
return FileIterator(self._file_name, start, stop)
def lastTransaction(self):
"""Return transaction id for last committed transaction"""
return self._ltid
def lastSerial(self, oid):
"""Return last serialno committed for object oid.
If there is no serialno for this oid -- which can only occur
if it is a new object -- return None.
"""
try:
pos = self._index[oid]
except KeyError:
return None
self._file.seek(pos)
# first 8 bytes are oid, second 8 bytes are serialno
h = self._file.read(16)
if len(h) < 16:
raise CorruptedDataError, h
if h[:8] != oid:
h = h + self._file.read(26) # get rest of header
raise CorruptedDataError, h
return h[8:]
def shift_transactions_forward(index, vindex, tindex, file, pos, opos):
"""Copy transactions forward in the data file
......@@ -1969,14 +2015,46 @@ class FileIterator(Iterator):
"""
_ltid=z64
def __init__(self, file):
def __init__(self, file, start=None, stop=None):
if isinstance(file, StringType):
file=open(file, 'rb')
self._file=file
if file.read(4) != packed_version: raise FileStorageFormatError, name
file = open(file, 'rb')
self._file = file
if file.read(4) != packed_version:
raise FileStorageFormatError, name
file.seek(0,2)
self._file_size=file.tell()
self._pos=4L
self._file_size = file.tell()
self._pos = 4L
assert start is None or isinstance(start, StringType)
assert stop is None or isinstance(stop, StringType)
if start:
self._skip_to_start(start)
self._stop = stop
def _skip_to_start(self, start):
# Scan through the transaction records doing almost no sanity
# checks.
while 1:
self._file.seek(self._pos)
h = self._file.read(16)
if len(h) < 16:
return
tid, stl = unpack(">8s8s", h)
if tid >= start:
return
tl = U64(stl)
try:
self._pos += tl + 8
except OverflowError:
self._pos = long(self._pos) + tl + 8
if __debug__:
# Sanity check
self._file.seek(self._pos - 8, 0)
rtl = self._file.read(8)
if rtl != stl:
pos = self._file.tell() - 8
panic("%s has inconsistent transaction length at %s "
"(%s != %s)",
self._file.name, pos, U64(rtl), U64(stl))
def next(self, index=0):
file=self._file
......@@ -1984,6 +2062,7 @@ class FileIterator(Iterator):
read=file.read
pos=self._pos
LOG("ZODB FS", BLATHER, "next(%d)" % index)
while 1:
# Read the transaction record
seek(pos)
......@@ -1994,7 +2073,7 @@ class FileIterator(Iterator):
if el < 0: el=t32-el
if tid <= self._ltid:
warn("%s time-stamp reduction at %s", name, pos)
warn("%s time-stamp reduction at %s", self._file.name, pos)
self._ltid=tid
tl=U64(stl)
......@@ -2004,11 +2083,12 @@ class FileIterator(Iterator):
# cleared. They may also be corrupted,
# in which case, we don't want to totally lose the data.
warn("%s truncated, possibly due to damaged records at %s",
name, pos)
self._file.name, pos)
break
if status not in ' up':
warn('%s has invalid status, %s, at %s', name, status, pos)
warn('%s has invalid status, %s, at %s', self._file.name,
status, pos)
if tl < (23+ul+dl+el):
# We're in trouble. Find out if this is bad data in
......@@ -2022,16 +2102,24 @@ class FileIterator(Iterator):
# reasonable:
if self._file_size - rtl < pos or rtl < 23:
nearPanic('%s has invalid transaction header at %s',
name, pos)
self._file.name, pos)
warn("It appears that there is invalid data at the end of "
"the file, possibly due to a system crash. %s "
"truncated to recover from bad data at end."
% name)
% self._file.name)
break
else:
warn('%s has invalid transaction header at %s', name, pos)
warn('%s has invalid transaction header at %s',
self._file.name, pos)
break
if self._stop is not None:
LOG("ZODB FS", BLATHER,
("tid %x > stop %x ? %d" %
(U64(tid), U64(self._stop), tid > self._stop)))
if self._stop is not None and tid > self._stop:
raise IndexError, index
tpos=pos
tend=tpos+tl
......@@ -2041,7 +2129,7 @@ class FileIterator(Iterator):
h=read(8)
if h != stl:
panic('%s has inconsistent transaction length at %s',
name, pos)
self._file.name, pos)
pos=tend+8
continue
......@@ -2067,7 +2155,7 @@ class FileIterator(Iterator):
h=read(8)
if h != stl:
warn("%s redundant transaction length check failed at %s",
name, pos)
self._file.name, pos)
break
self._pos=pos+8
......@@ -2075,7 +2163,7 @@ class FileIterator(Iterator):
raise IndexError, index
class RecordIterator(Iterator):
class RecordIterator(Iterator, BaseStorage.TransactionRecord):
"""Iterate over the transactions in a FileStorage file.
"""
def __init__(self, tid, status, user, desc, ext, pos, stuff):
......@@ -2127,9 +2215,8 @@ class RecordIterator(Iterator):
return r
raise IndexError, index
class Record:
class Record(BaseStorage.DataRecord):
"""An abstract database record
"""
def __init__(self, *args):
......
......@@ -86,7 +86,7 @@ method::
and call it to minotor the storage.
"""
__version__='$Revision: 1.4 $'[11:-2]
__version__='$Revision: 1.5 $'[11:-2]
import POSException, BaseStorage, string, utils
from TimeStamp import TimeStamp
......@@ -136,7 +136,8 @@ class MappingStorage(BaseStorage.BaseStorage):
if self._index.has_key(oid):
old=self._index[oid]
oserial=old[:8]
if serial != oserial: raise POSException.ConflictError
if serial != oserial:
raise POSException.ConflictError(serials=(oserial, serial))
serial=self._serial
self._tindex.append((oid,serial+data))
......
......@@ -10,14 +10,14 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
'''BoboPOS-defined exceptions
"""BoboPOS-defined exceptions
$Id: POSException.py,v 1.8 2001/11/28 15:51:20 matt Exp $'''
__version__='$Revision: 1.8 $'[11:-2]
$Id: POSException.py,v 1.9 2002/01/17 17:34:33 jeremy Exp $"""
__version__ = '$Revision: 1.9 $'.split()[-2:][0]
from string import join
StringType=type('')
DictType=type({})
from types import StringType, DictType
from ZODB import utils
class POSError(Exception):
"""Persistent object system error
......@@ -28,10 +28,94 @@ class TransactionError(POSError):
"""
class ConflictError(TransactionError):
"""Two transactions tried to modify the same object at once
"""Two transactions tried to modify the same object at once. This
transaction should be resubmitted.
Instance attributes:
oid : string
the OID (8-byte packed string) of the object in conflict
class_name : string
the fully-qualified name of that object's class
message : string
a human-readable explanation of the error
serials : (string, string)
a pair of 8-byte packed strings; these are the serial numbers
(old and new) of the object in conflict. (Serial numbers are
closely related [equal?] to transaction IDs; a ConflictError may
be triggered by a serial number mismatch.)
The caller should pass either object or oid as a keyword argument,
but not both of them. If object is passed, it should be a
persistent object with an _p_oid attribute.
"""
This transaction should be resubmitted.
def __init__(self, message=None, object=None, oid=None, serials=None):
if message is None:
self.message = "database conflict error"
else:
self.message = message
if object is None:
self.oid = None
self.class_name = None
else:
self.oid = object._p_oid
klass = object.__class__
self.class_name = klass.__module__ + "." + klass.__name__
if oid is not None:
assert self.oid is None
self.oid = oid
self.serials = serials
def __str__(self):
extras = []
if self.oid:
extras.append("oid %016x" % utils.U64(self.oid))
if self.class_name:
extras.append("class %s" % self.class_name)
if self.serials:
extras.append("serial was %016x, now %016x" %
tuple(map(utils.U64, self.serials)))
if extras:
return "%s (%s)" % (self.message, ", ".join(extras))
else:
return self.message
def get_oid(self):
return self.oid
def get_class_name(self):
return self.class_name
def get_old_serial(self):
return self.serials[0]
def get_new_serial(self):
return self.serials[1]
def get_serials(self):
return self.serials
class ReadConflictError(ConflictError):
"""A conflict detected at read time -- attempt to read an object
that has changed in another transaction (eg. another thread
or process).
"""
def __init__(self, message=None, object=None, serials=None):
if message is None:
message = "database read conflict error"
ConflictError.__init__(self, message=message, object=object,
serials=serials)
class BTreesConflictError(ConflictError):
"""A special subclass for BTrees conflict errors, which return
an undocumented four-tuple."""
def __init__(self, *btree_args):
ConflictError.__init__(self, message="BTrees conflict error")
self.btree = btree_args
class VersionError(POSError):
"""An error in handling versions occurred
......@@ -84,6 +168,10 @@ class MountedStorageError(StorageError):
"""Unable to access mounted storage.
"""
class ReadOnlyError(StorageError):
"""Unable to modify objects in a read-only storage.
"""
class ExportError(POSError):
"""An export file doesn't have the right format.
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment