Commit 2776efa0 authored by Barry Warsaw's avatar Barry Warsaw

Added support for the `iterator' interface for storages.

Specifically,

_loadSerialEx(): New method which is similar to the API method
loadSerial() except that it returns both the pickle and the version
string for the specified oid+serial pair.  This is not a public
method.

loadSerial(): Reimplement in terms of _loadSerialEx() to reduce code
duplication.

iterator(): Public method to return a "transactions iterator".

_nexttxn(): Helper method to return the transaction metadata for the
transaction following the given in the argument.  Raises IndexError if
there is no next transaction.

_alltxnoids(): Helper method to return a list of all the oids modified
in the given transaction.  Note that the Full Berkeley storage has the
implied semantics that if an object is modified more than once in a
transaction, only the last such modification is retained.

Classes _TransactionsIterator, _RecordsIterator, _Record are private
helper classes to support the iterator interface.
parent 4cffef54
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
__version__ = '$Revision: 1.24 $'[-2:][0] __version__ = '$Revision: 1.25 $'[-2:][0]
import struct import struct
import time import time
...@@ -260,7 +260,8 @@ class Full(BerkeleyBase): ...@@ -260,7 +260,8 @@ class Full(BerkeleyBase):
# version for now. # version for now.
# #
# FIXME: need to watch for two object revisions in the # FIXME: need to watch for two object revisions in the
# same transaction and only bump the refcount once. # same transaction and only bump the refcount once,
# since we only keep the last of any such revisions.
refdoids = [] refdoids = []
referencesf(pickle, refdoids) referencesf(pickle, refdoids)
for roid in refdoids: for roid in refdoids:
...@@ -500,18 +501,27 @@ class Full(BerkeleyBase): ...@@ -500,18 +501,27 @@ class Full(BerkeleyBase):
finally: finally:
self._lock_release() self._lock_release()
def loadSerial(self, oid, serial): def _loadSerialEx(self, oid, serial):
# Return the revision of the object with the given serial number. # Just like loadSerial, except that it returns both the pickle and the
# version this object revision is living in.
self._lock_acquire() self._lock_acquire()
try: try:
# Get the pointer to the pickle (i.e. live revid, or lrevid) # Get the pointer to the pickle (i.e. live revid, or lrevid)
# corresponding to the oid and the supplied serial # corresponding to the oid and the supplied serial
# a.k.a. revision. # a.k.a. revision.
lrevid = self._metadata[oid+serial][16:24] vid, ign, lrevid = struct.unpack(
return self._pickles[oid+lrevid] '>8s8s8s', self._metadata[oid+serial][:24])
if vid == ZERO:
version = ''
else:
version = self._versions[vid]
return self._pickles[oid+lrevid], version
finally: finally:
self._lock_release() self._lock_release()
def loadSerial(self, oid, serial):
return self._loadSerialEx(oid, serial)[0]
def __findcreatevid(self, version): def __findcreatevid(self, version):
# Get the vid associated with a version string, or create one if there # Get the vid associated with a version string, or create one if there
# is no vid for the version. # is no vid for the version.
...@@ -1059,6 +1069,10 @@ class Full(BerkeleyBase): ...@@ -1059,6 +1069,10 @@ class Full(BerkeleyBase):
self._lock_release() self._lock_release()
# GCable interface, for cyclic garbage collection # GCable interface, for cyclic garbage collection
#
# BAW: Note that the GCable interface methods are largely untested.
# Support for these is off the table for the 1.0 release of the Berkeley
# storage.
def gcTrash(oids): def gcTrash(oids):
"""Given a list of oids, treat them as trash. """Given a list of oids, treat them as trash.
...@@ -1124,6 +1138,64 @@ class Full(BerkeleyBase): ...@@ -1124,6 +1138,64 @@ class Full(BerkeleyBase):
c.close() c.close()
self._lock_release() self._lock_release()
# Fail-safe `iterator' interface, used to copy and analyze storage
# transaction data.
def iterator(self):
"""Get a transactions iterator for the storage."""
return _TransactionsIterator(self)
def _nexttxn(self, tid):
self._lock_acquire()
c = self._txnMetadata.cursor()
try:
# Berkeley raises DBNotFound exceptions (a.k.a. KeyError) to
# signal that it's at the end of records. Turn these into
# IndexError to signal the end of iteration.
try:
if tid is None:
# We want the first transaction
rec = c.first()
else:
# Get the next transaction after the specified one.
c.set(tid)
rec = c.next()
except KeyError:
raise IndexError
if rec is None:
raise IndexError
tid, data = rec
# Now unpack the necessary information. Don't impedence match the
# status flag (that's done by the caller).
status = data[0]
userlen, desclen = struct.unpack('>II', data[1:9])
user = data[9:9+userlen]
desc = data[9+userlen:9+userlen+desclen]
ext = data[9+userlen+desclen:]
return tid, status, user, desc, ext
finally:
if c:
c.close()
self._lock_release()
def _alltxnoids(self, tid):
self._lock_acquire()
c = self._txnoids.cursor()
try:
oids = []
oidkeys = {}
rec = c.set(tid)
while rec:
# Ignore the key
oid = rec[1]
if not oidkeys.has_key(oid):
oids.append(oid)
oidkeys[oid] = 1
rec = c.next_dup()
return oids
finally:
c.close()
self._lock_release()
# Other interface assertions # Other interface assertions
def supportsTransactionalUndo(self): def supportsTransactionalUndo(self):
return 1 return 1
...@@ -1133,3 +1205,97 @@ class Full(BerkeleyBase): ...@@ -1133,3 +1205,97 @@ class Full(BerkeleyBase):
def supportsVersions(self): def supportsVersions(self):
return 1 return 1
class _TransactionsIterator:
"""Provide forward iteration through the transactions in a storage.
Transactions *must* be accessed sequentially (e.g. with a for loop).
"""
def __init__(self, storage):
self._storage = storage
self._tid = None
def __getitem__(self, i):
"""Return the ith item in the sequence of transaction data.
Items must be accessed sequentially, and are instances of
RecordsIterator. An IndexError will be raised after all of the items
have been returned.
"""
# Let IndexErrors percolate up.
tid, status, user, desc, ext = self._storage._nexttxn(self._tid)
self._tid = tid
return _RecordsIterator(self._storage, tid, status, user, desc, ext)
class _RecordsIterator:
"""Provide transaction meta-data and forward iteration through the
transactions in a storage.
Items *must* be accessed sequentially (e.g. with a for loop).
"""
# Transaction id as an 8-byte timestamp string
tid = None
# Transaction status code;
# ' ' -- normal
# 'p' -- Transaction has been packed, and contains incomplete data.
#
# Note that undone ('u') and checkpoint transactions ('c') should not be
# included.
status = None
# The standard transaction metadata
user = None
description = None
_extension = None
def __init__(self, storage, tid, status, user, desc, ext):
self._storage = storage
self.tid = tid
# Impedence matching
if status == UNDOABLE_TRANSACTION:
self.status = ' '
else:
self.status = 'p'
self.user = user
self.description = desc
self._extension = ext
# Internal pointer
self._oids = self._storage._alltxnoids(self.tid)
# To make .pop() more efficient
self._oids.reverse()
def __getitem__(self, i):
"""Return the ith item in the sequence of record data.
Items must be accessed sequentially, and are instances of Record. An
IndexError will be raised after all of the items have been
returned.
"""
# Let IndexError percolate up
oid = self._oids.pop()
pickle, version = self._storage._loadSerialEx(oid, self.tid)
return _Record(oid, self.tid, version, pickle)
class _Record:
# Object Id
oid = None
# Object serial number (i.e. revision id)
serial = None
# Version string
version = None
# Data pickle
data = None
def __init__(self, oid, serial, version, data):
self.oid = oid
self.serial = serial
self.version = version
self.data = data
...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support ...@@ -4,7 +4,7 @@ See Minimal.py for an implementation of Berkeley storage that does not support
undo or versioning. undo or versioning.
""" """
__version__ = '$Revision: 1.24 $'[-2:][0] __version__ = '$Revision: 1.25 $'[-2:][0]
import struct import struct
import time import time
...@@ -260,7 +260,8 @@ class Full(BerkeleyBase): ...@@ -260,7 +260,8 @@ class Full(BerkeleyBase):
# version for now. # version for now.
# #
# FIXME: need to watch for two object revisions in the # FIXME: need to watch for two object revisions in the
# same transaction and only bump the refcount once. # same transaction and only bump the refcount once,
# since we only keep the last of any such revisions.
refdoids = [] refdoids = []
referencesf(pickle, refdoids) referencesf(pickle, refdoids)
for roid in refdoids: for roid in refdoids:
...@@ -500,18 +501,27 @@ class Full(BerkeleyBase): ...@@ -500,18 +501,27 @@ class Full(BerkeleyBase):
finally: finally:
self._lock_release() self._lock_release()
def loadSerial(self, oid, serial): def _loadSerialEx(self, oid, serial):
# Return the revision of the object with the given serial number. # Just like loadSerial, except that it returns both the pickle and the
# version this object revision is living in.
self._lock_acquire() self._lock_acquire()
try: try:
# Get the pointer to the pickle (i.e. live revid, or lrevid) # Get the pointer to the pickle (i.e. live revid, or lrevid)
# corresponding to the oid and the supplied serial # corresponding to the oid and the supplied serial
# a.k.a. revision. # a.k.a. revision.
lrevid = self._metadata[oid+serial][16:24] vid, ign, lrevid = struct.unpack(
return self._pickles[oid+lrevid] '>8s8s8s', self._metadata[oid+serial][:24])
if vid == ZERO:
version = ''
else:
version = self._versions[vid]
return self._pickles[oid+lrevid], version
finally: finally:
self._lock_release() self._lock_release()
def loadSerial(self, oid, serial):
return self._loadSerialEx(oid, serial)[0]
def __findcreatevid(self, version): def __findcreatevid(self, version):
# Get the vid associated with a version string, or create one if there # Get the vid associated with a version string, or create one if there
# is no vid for the version. # is no vid for the version.
...@@ -1059,6 +1069,10 @@ class Full(BerkeleyBase): ...@@ -1059,6 +1069,10 @@ class Full(BerkeleyBase):
self._lock_release() self._lock_release()
# GCable interface, for cyclic garbage collection # GCable interface, for cyclic garbage collection
#
# BAW: Note that the GCable interface methods are largely untested.
# Support for these is off the table for the 1.0 release of the Berkeley
# storage.
def gcTrash(oids): def gcTrash(oids):
"""Given a list of oids, treat them as trash. """Given a list of oids, treat them as trash.
...@@ -1124,6 +1138,64 @@ class Full(BerkeleyBase): ...@@ -1124,6 +1138,64 @@ class Full(BerkeleyBase):
c.close() c.close()
self._lock_release() self._lock_release()
# Fail-safe `iterator' interface, used to copy and analyze storage
# transaction data.
def iterator(self):
"""Get a transactions iterator for the storage."""
return _TransactionsIterator(self)
def _nexttxn(self, tid):
self._lock_acquire()
c = self._txnMetadata.cursor()
try:
# Berkeley raises DBNotFound exceptions (a.k.a. KeyError) to
# signal that it's at the end of records. Turn these into
# IndexError to signal the end of iteration.
try:
if tid is None:
# We want the first transaction
rec = c.first()
else:
# Get the next transaction after the specified one.
c.set(tid)
rec = c.next()
except KeyError:
raise IndexError
if rec is None:
raise IndexError
tid, data = rec
# Now unpack the necessary information. Don't impedence match the
# status flag (that's done by the caller).
status = data[0]
userlen, desclen = struct.unpack('>II', data[1:9])
user = data[9:9+userlen]
desc = data[9+userlen:9+userlen+desclen]
ext = data[9+userlen+desclen:]
return tid, status, user, desc, ext
finally:
if c:
c.close()
self._lock_release()
def _alltxnoids(self, tid):
self._lock_acquire()
c = self._txnoids.cursor()
try:
oids = []
oidkeys = {}
rec = c.set(tid)
while rec:
# Ignore the key
oid = rec[1]
if not oidkeys.has_key(oid):
oids.append(oid)
oidkeys[oid] = 1
rec = c.next_dup()
return oids
finally:
c.close()
self._lock_release()
# Other interface assertions # Other interface assertions
def supportsTransactionalUndo(self): def supportsTransactionalUndo(self):
return 1 return 1
...@@ -1133,3 +1205,97 @@ class Full(BerkeleyBase): ...@@ -1133,3 +1205,97 @@ class Full(BerkeleyBase):
def supportsVersions(self): def supportsVersions(self):
return 1 return 1
class _TransactionsIterator:
"""Provide forward iteration through the transactions in a storage.
Transactions *must* be accessed sequentially (e.g. with a for loop).
"""
def __init__(self, storage):
self._storage = storage
self._tid = None
def __getitem__(self, i):
"""Return the ith item in the sequence of transaction data.
Items must be accessed sequentially, and are instances of
RecordsIterator. An IndexError will be raised after all of the items
have been returned.
"""
# Let IndexErrors percolate up.
tid, status, user, desc, ext = self._storage._nexttxn(self._tid)
self._tid = tid
return _RecordsIterator(self._storage, tid, status, user, desc, ext)
class _RecordsIterator:
"""Provide transaction meta-data and forward iteration through the
transactions in a storage.
Items *must* be accessed sequentially (e.g. with a for loop).
"""
# Transaction id as an 8-byte timestamp string
tid = None
# Transaction status code;
# ' ' -- normal
# 'p' -- Transaction has been packed, and contains incomplete data.
#
# Note that undone ('u') and checkpoint transactions ('c') should not be
# included.
status = None
# The standard transaction metadata
user = None
description = None
_extension = None
def __init__(self, storage, tid, status, user, desc, ext):
self._storage = storage
self.tid = tid
# Impedence matching
if status == UNDOABLE_TRANSACTION:
self.status = ' '
else:
self.status = 'p'
self.user = user
self.description = desc
self._extension = ext
# Internal pointer
self._oids = self._storage._alltxnoids(self.tid)
# To make .pop() more efficient
self._oids.reverse()
def __getitem__(self, i):
"""Return the ith item in the sequence of record data.
Items must be accessed sequentially, and are instances of Record. An
IndexError will be raised after all of the items have been
returned.
"""
# Let IndexError percolate up
oid = self._oids.pop()
pickle, version = self._storage._loadSerialEx(oid, self.tid)
return _Record(oid, self.tid, version, pickle)
class _Record:
# Object Id
oid = None
# Object serial number (i.e. revision id)
serial = None
# Version string
version = None
# Data pickle
data = None
def __init__(self, oid, serial, version, data):
self.oid = oid
self.serial = serial
self.version = version
self.data = data
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment