Commit 7f42c6c7 authored by Christian Theune's avatar Christian Theune

- different cleanups

 - merged from head
 - added configuration methods to configure a blobfilestorage
 - made stuff work ;)
parent 6f66bbcb
...@@ -128,21 +128,37 @@ cZopeInterface = Extension( ...@@ -128,21 +128,37 @@ cZopeInterface = Extension(
sources= ['src/zope/interface/_zope_interface_coptimizations.c'] sources= ['src/zope/interface/_zope_interface_coptimizations.c']
) )
exts += [cPersistence, cPickleCache, TimeStamp, winlock, cZopeInterface] cZopeProxy = Extension(
name = 'zope.proxy._zope_proxy_proxy',
sources= ['src/zope/proxy/_zope_proxy_proxy.c']
)
exts += [cPersistence,
cPickleCache,
TimeStamp,
winlock,
cZopeInterface,
cZopeProxy,
]
# The ZODB.zodb4 code is not being packaged, because it is only # The ZODB.zodb4 code is not being packaged, because it is only
# need to convert early versions of Zope3 databases to ZODB3. # need to convert early versions of Zope3 databases to ZODB3.
packages = ["BTrees", "BTrees.tests", packages = ["BTrees", "BTrees.tests",
"ZEO", "ZEO.auth", "ZEO.zrpc", "ZEO.tests", "ZEO", "ZEO.auth", "ZEO.zrpc", "ZEO.tests",
"ZODB", "ZODB.FileStorage", "ZODB.Blobs", "ZODB", "ZODB.FileStorage", "ZODB.Blobs", "ZODB.Blobs.tests",
"ZODB.tests", "ZODB.tests",
"Persistence", "Persistence.tests", "Persistence", "Persistence.tests",
"persistent", "persistent.tests", "persistent", "persistent.tests",
"transaction", "transaction.tests", "transaction", "transaction.tests",
"ThreadedAsync", "ThreadedAsync",
"zdaemon", "zdaemon.tests", "zdaemon", "zdaemon.tests",
"zope", "zope.interface", "zope.testing",
"zope",
"zope.interface", "zope.interface.tests",
"zope.proxy", "zope.proxy.tests",
"zope.testing",
"ZopeUndo", "ZopeUndo.tests", "ZopeUndo", "ZopeUndo.tests",
"ZConfig", "ZConfig.tests", "ZConfig", "ZConfig.tests",
"ZConfig.components", "ZConfig.components",
...@@ -187,6 +203,7 @@ def copy_other_files(cmd, outputbase): ...@@ -187,6 +203,7 @@ def copy_other_files(cmd, outputbase):
"ZODB/tests", "ZODB/tests",
"zdaemon", "zdaemon",
"zdaemon/tests", "zdaemon/tests",
"zope/interface", "zope/interface/tests",
]: ]:
dir = convert_path(dir) dir = convert_path(dir)
inputdir = os.path.join("src", dir) inputdir = os.path.join("src", dir)
......
...@@ -252,6 +252,12 @@ class BaseStorage(UndoLogCompatible): ...@@ -252,6 +252,12 @@ class BaseStorage(UndoLogCompatible):
pass pass
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
# It's important that the storage calls the function we pass
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._lock_acquire() self._lock_acquire()
try: try:
if transaction is not self._transaction: if transaction is not self._transaction:
......
import os import os
import tempfile
from zope.interface import implements from zope.interface import implements
...@@ -8,33 +9,22 @@ from ZODB.Blobs.exceptions import BlobError ...@@ -8,33 +9,22 @@ from ZODB.Blobs.exceptions import BlobError
from ZODB import utils from ZODB import utils
from persistent import Persistent from persistent import Persistent
class TempFileHandler(object): try:
"""Handles holding a tempfile around. from ZPublisher.Iterators import IStreamIterator
except ImportError:
The tempfile is unlinked when the tempfilehandler is GCed. IStreamIterator = None
"""
def __init__(self, directory, mode)
self.handle, self.filename = tempfile.mkstemp(dir=directory,
text=mode)
def __del__(self):
self.handle
os.unlink(self.filename)
class Blob(Persistent): class Blob(Persistent):
implements(IBlob) implements(IBlob)
def __init__(self): _p_blob_readers = 0
self._p_blob_readers = 0 _p_blob_writers = 0
self._p_blob_writers = 0 _p_blob_uncommitted = None
self._p_blob_uncommitted = None _p_blob_data = None
self._p_blob_data = None
def open(self, mode): def open(self, mode):
"""Returns a file(-like) object for handling the blob data.""" """Returns a file(-like) object for handling the blob data."""
if mode == "r": if mode == "r":
if self._current_filename() is None: if self._current_filename() is None:
raise BlobError, "Blob does not exist." raise BlobError, "Blob does not exist."
...@@ -43,17 +33,17 @@ class Blob(Persistent): ...@@ -43,17 +33,17 @@ class Blob(Persistent):
raise BlobError, "Already opened for writing." raise BlobError, "Already opened for writing."
self._p_blob_readers += 1 self._p_blob_readers += 1
return BlobTempFile(self._current_filename(), "rb", self) return BlobFile(self._current_filename(), "rb", self)
if mode == "w": if mode == "w":
if self._p_blob_readers != 0: if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading." raise BlobError, "Already opened for reading."
if self._p_blob_uncommitted is None: if self._p_blob_uncommitted is None:
self._p_blob_uncommitted = self._get_uncommitted_filename() self._p_blob_uncommitted = utils.mktemp()
self._p_blob_writers += 1 self._p_blob_writers += 1
return BlobTempFile(self._p_blob_uncommitted, "wb", self) return BlobFile(self._p_blob_uncommitted, "wb", self)
if mode =="a": if mode =="a":
if self._current_filename() is None: if self._current_filename() is None:
...@@ -62,15 +52,15 @@ class Blob(Persistent): ...@@ -62,15 +52,15 @@ class Blob(Persistent):
if self._p_blob_readers != 0: if self._p_blob_readers != 0:
raise BlobError, "Already opened for reading." raise BlobError, "Already opened for reading."
if not self._p_blob_uncommitted: if self._p_blob_uncommitted is None:
# Create a new working copy # Create a new working copy
self._p_blob_uncommitted = self._get_uncommitted_filename() self._p_blob_uncommitted = utils.mktmp()
uncommitted = BlobTempFile(self._p_blob_uncommitted, "wb", self) uncommitted = BlobFile(self._p_blob_uncommitted, "wb", self)
utils.cp(file(self._p_blob_data), uncommitted) utils.cp(file(self._p_blob_data), uncommitted)
uncommitted.seek(0) uncommitted.seek(0)
else: else:
# Re-use existing working copy # Re-use existing working copy
uncommitted = BlobTempFile(self._p_blob_uncommitted, "ab", self) uncommitted = BlobFile(self._p_blob_uncommitted, "ab", self)
self._p_blob_writers +=1 self._p_blob_writers +=1
return uncommitted return uncommitted
...@@ -80,28 +70,29 @@ class Blob(Persistent): ...@@ -80,28 +70,29 @@ class Blob(Persistent):
def _current_filename(self): def _current_filename(self):
return self._p_blob_uncommitted or self._p_blob_data return self._p_blob_uncommitted or self._p_blob_data
def _get_uncommitted_filename(self): class BlobFile(file):
return os.tempnam()
class BlobFileBase:
# XXX those files should be created in the same partition as # XXX those files should be created in the same partition as
# the storage later puts them to avoid copying them ... # the storage later puts them to avoid copying them ...
if IStreamIterator is not None:
__implements__ = (IStreamIterator,)
def __init__(self, name, mode, blob): def __init__(self, name, mode, blob):
file.__init__(self, name, mode) super(BlobFile, self).__init__(name, mode)
self.blob = blob self.blob = blob
self.streamsize = 1<<16
def write(self, data): def write(self, data):
file.write(self, data) super(BlobFile, self).write(data)
self.blob._p_changed = 1 self.blob._p_changed = 1
def writelines(self, lines): def writelines(self, lines):
file.writelines(self, lines) super(BlobFile, self).writelines(lines)
self.blob._p_changed = 1 self.blob._p_changed = 1
def truncate(self, size): def truncate(self, size):
file.truncate(self, size) super(BlobFile, self).truncate(size)
self.blob._p_changed = 1 self.blob._p_changed = 1
def close(self): def close(self):
...@@ -110,15 +101,20 @@ class BlobFileBase: ...@@ -110,15 +101,20 @@ class BlobFileBase:
self.blob._p_blob_writers -= 1 self.blob._p_blob_writers -= 1
else: else:
self.blob._p_blob_readers -= 1 self.blob._p_blob_readers -= 1
file.close(self) super(BlobFile, self).close()
def next(self):
data = self.read(self.streamsize)
if not data:
self.blob._p_blob_readers -= 1
raise StopIteration
return data
class BlobFile(BlobFileBase, file): def __len__(self):
pass cur_pos = self.tell()
self.seek(0, 2)
size = self.tell()
self.seek(cur_pos, 0)
return size
class BlobTempFile(BlobFileBase, NamedTempFile)
pass
def copy_file(old, new):
for chunk in old.read(4096):
new.write(chunk)
new.seek(0)
...@@ -12,41 +12,85 @@ ...@@ -12,41 +12,85 @@
# #
############################################################################## ##############################################################################
import os
from zope.interface import implements from zope.interface import implements
from zope.proxy import ProxyBase from zope.proxy import ProxyBase, getProxiedObject
from ZODB.interfaces import \ from ZODB import utils
IStorageAdapter, IUndoableStorage, IVersioningStorage, IBlobStorage from ZODB.Blobs.interfaces import IBlobStorage, IBlob
class BlobStorage(ProxyBase): class BlobStorage(ProxyBase):
"""A storage to support blobs.""" """A storage to support blobs."""
implements(IBlobStorage) implements(IBlobStorage)
__slots__ = ('base_directory',) __slots__ = ('base_directory', 'dirty_oids')
def __new__(self, base_directory, storage):
return ProxyBase.__new__(self, storage)
def __init__(self, base_directory, storage): def __init__(self, base_directory, storage):
# TODO Complain if storage is ClientStorage
ProxyBase.__init__(self, storage) ProxyBase.__init__(self, storage)
self.base_directory = base_directory self.base_directory = base_directory
self.dirty_oids = []
def storeBlob(oid, serial, data, blob, version, transaction): def storeBlob(self, oid, oldserial, data, blobfilename, version, transaction):
"""Stores data that has a BLOB attached.""" """Stores data that has a BLOB attached."""
if transaction is not self._transaction: serial = self.store(oid, oldserial, data, version, transaction)
raise POSException.StorageTransactionError(self, transaction) assert isinstance(serial, str) # XXX in theory serials could be
# something else
self._lock_acquire() self._lock_acquire()
try: try:
# targetname = self._getCleanFilename(oid, serial)
try:
os.rename(blobfilename, targetname)
except OSError:
target = file(targetname, "wb")
source = file(blobfilename, "rb")
utils.cp(blobfile, target)
target.close()
source.close()
os.unlink(blobfilename)
# XXX if oid already in there, something is really hosed.
# The underlying storage should have complained anyway
self.dirty_oids.append((oid, serial))
finally: finally:
self._lock_release() self._lock_release()
return self._tid return self._tid
def _getDirtyFilename(self, oid):
"""Generate an intermediate filename for two-phase commit.
XXX Not used right now due to conceptual flux. Please keep it around
anyway.
"""
return self._getCleanFilename(oid, "store")
def _getCleanFilename(self, oid, tid):
return "%s/%s-%s.blob" % \
(self.base_directory,
utils.oid_repr(oid),
utils.tid_repr(tid),
)
def _finish(self, tid, u, d, e):
ProxyBase._finish(self, tid, u, d, e)
self.dirty_blobs = []
def loadBlob(oid, serial, version, blob): def _abort(self):
"""Loads the BLOB data for 'oid' into the given blob object. ProxyBase._abort(self)
# Throw away the stuff we'd had committed
while self.dirty_blobs:
oid, serial = self.dirty_blobs.pop()
os.unlink(self._getCleanFilename(oid))
def loadBlob(self, oid, serial, version):
"""Return the filename where the blob file can be found.
""" """
return self._getCleanFilename(oid, serial)
- Blob instances should clean up temporary files after committing - Blob instances should clean up temporary files after committing
- Support database import/export
...@@ -13,3 +13,20 @@ class IBlob(Interface): ...@@ -13,3 +13,20 @@ class IBlob(Interface):
# XXX need a method to initialize the blob from the storage # XXX need a method to initialize the blob from the storage
# this means a) setting the _p_blob_data filename and b) putting # this means a) setting the _p_blob_data filename and b) putting
# the current data in that file # the current data in that file
class IBlobStorage(Interface):
"""A storage supporting BLOBs."""
def storeBlob(oid, oldserial, data, blob, version, transaction):
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial, version):
"""Return the filename of the Blob data responding to this OID and
serial.
Returns a filename or None if no Blob data is connected with this OID.
"""
def getBlobDirectory():
"""
"""
...@@ -23,15 +23,18 @@ need a Blob with some data: ...@@ -23,15 +23,18 @@ need a Blob with some data:
>>> blob = Blob() >>> blob = Blob()
>>> data = blob.open("w") >>> data = blob.open("w")
>>> data.write("I'm a happy Blob.") >>> data.write("I'm a happy Blob.")
>>> data.close()
We also need a database with a blob supporting storage: We also need a database with a blob supporting storage:
>>> from ZODB.MappingStorage import MappingStorage >>> from ZODB.MappingStorage import MappingStorage
>>> from ZODB.Blobs.BlobStorage import BlobStorage
>>> from ZODB.DB import DB
>>> from tempfile import mkdtemp >>> from tempfile import mkdtemp
>>> base_storage = MappingStorage("test") >>> base_storage = MappingStorage("test")
>>> blob_dir = mkdtemp() >>> blob_dir = mkdtemp()
>>> blob_storage = BlobStorage(blob_dir, base_storage) >>> blob_storage = BlobStorage(blob_dir, base_storage)
>>> database = DB(storage) >>> database = DB(blob_storage)
Putting a Blob into a Connection works like every other object: Putting a Blob into a Connection works like every other object:
...@@ -40,12 +43,11 @@ Putting a Blob into a Connection works like every other object: ...@@ -40,12 +43,11 @@ Putting a Blob into a Connection works like every other object:
>>> root['myblob'] = blob >>> root['myblob'] = blob
>>> import transaction >>> import transaction
>>> transaction.commit() >>> transaction.commit()
>>> connection.close()
Getting stuff out of there works similar: Getting stuff out of there works similar:
>>> connection = database.open() >>> connection2 = database.open()
>>> root = connection.root() >>> root = connection2.root()
>>> blob2 = root['myblob'] >>> blob2 = root['myblob']
>>> IBlob.isImplementedBy(blob2) >>> IBlob.isImplementedBy(blob2)
True True
...@@ -56,17 +58,18 @@ You can't put blobs into a database that has uses a Non-Blob-Storage, though: ...@@ -56,17 +58,18 @@ You can't put blobs into a database that has uses a Non-Blob-Storage, though:
>>> no_blob_storage = MappingStorage() >>> no_blob_storage = MappingStorage()
>>> database2 = DB(no_blob_storage) >>> database2 = DB(no_blob_storage)
>>> connection = database.open() >>> connection3 = database2.open()
>>> root = connection.root() >>> root = connection3.root()
>>> root['myblob'] = blob >>> root['myblob'] = Blob()
>>> transaction.commit() >>> transaction.commit() # doctest: +ELLIPSIS
Traceback (most recent call last): Traceback (most recent call last):
... ...
POSException.Unsupported: Storing Blobs is not supported. Unsupported: Storing Blobs in <ZODB.MappingStorage.MappingStorage instance at ...> is not supported.
While we are testing this, we don't need the storage directory and databases anymore: While we are testing this, we don't need the storage directory and databases anymore:
>>> import os >>> import shutil
>>> os.unlink(blob_dir) >>> shutil.rmtree(blob_dir)
>>> transaction.abort()
>>> database.close() >>> database.close()
>>> database2.close() >>> database2.close()
...@@ -15,4 +15,4 @@ ...@@ -15,4 +15,4 @@
from zope.testing.doctestunit import DocFileSuite from zope.testing.doctestunit import DocFileSuite
def test_suite(): def test_suite():
return DocFileSuite("../README.txt") return DocFileSuite("../Blob.txt", "connection.txt")
...@@ -23,20 +23,24 @@ from time import time ...@@ -23,20 +23,24 @@ from time import time
from persistent import PickleCache from persistent import PickleCache
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.Blobs.interfaces import IBlob, IBlobStorage
from transaction.interfaces import IDataManager
from zope.interface import implements
import transaction import transaction
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport from ZODB.ExportImport import ExportImport
from ZODB.POSException \ from ZODB.POSException \
import ConflictError, ReadConflictError, InvalidObjectReference, \ import ConflictError, ReadConflictError, InvalidObjectReference, \
ConnectionStateError ConnectionStateError, Unsupported
from ZODB.TmpStore import TmpStore from ZODB.TmpStore import TmpStore
from ZODB.utils import u64, oid_repr, z64, positive_id
from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr from ZODB.serialize import ObjectWriter, ConnectionObjectReader, myhasattr
from ZODB.interfaces import IConnection from ZODB.utils import u64, oid_repr, z64, positive_id, \
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36 DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
global_reset_counter = 0 global_reset_counter = 0
...@@ -54,127 +58,19 @@ def resetCaches(): ...@@ -54,127 +58,19 @@ def resetCaches():
global_reset_counter += 1 global_reset_counter += 1
class Connection(ExportImport, object): class Connection(ExportImport, object):
"""Connection to ZODB for loading and storing objects. """Connection to ZODB for loading and storing objects."""
The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
:Groups:
- `User Methods`: root, get, add, close, db, sync, isReadOnly,
cacheGC, cacheFullSweep, cacheMinimize, getVersion,
modifiedInVersion
- `Experimental Methods`: setLocalTransaction, getTransaction,
onCloseCallbacks
- `Transaction Data Manager Methods`: tpc_begin, tpc_vote,
tpc_finish, tpc_abort, sortKey, abort, commit, commit_sub,
abort_sub
- `Database Invalidation Methods`: invalidate, _setDB
- `IPersistentDataManager Methods`: setstate, register,
setklassstate
- `Other Methods`: oldstate, exchange, getDebugInfo, setDebugInfo,
getTransferCounts
""" implements(IConnection, IDataManager, IPersistentDataManager)
implements(IConnection)
_tmp = None _tmp = None
_code_timestamp = 0 _code_timestamp = 0
# ZODB.IConnection
def __init__(self, version='', cache_size=400, def __init__(self, version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None, cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True): synch=True):
"""Create a new Connection. """Create a new Connection."""
A Connection instance should by instantiated by the DB
instance that it is connected to.
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
- `cache_size`: the target size of the in-memory object
cache, measured in objects.
- `cache_deactivate_after`: deprecated, ignored
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
self._log = logging.getLogger("ZODB.Connection") self._log = logging.getLogger("ZODB.Connection")
self._storage = None self._storage = None
self._debug_info = () self._debug_info = ()
...@@ -220,7 +116,7 @@ class Connection(ExportImport, object): ...@@ -220,7 +116,7 @@ class Connection(ExportImport, object):
# from a single transaction should be applied atomically, so # from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated. # the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated. # It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which # Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage # will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In # might generate oids where hash or compare invokes Python code. In
...@@ -253,79 +149,20 @@ class Connection(ExportImport, object): ...@@ -253,79 +149,20 @@ class Connection(ExportImport, object):
# to pass to _importDuringCommit(). # to pass to _importDuringCommit().
self._import = None self._import = None
def getTransaction(self): self.connections = None
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def __repr__(self): def get_connection(self, database_name):
if self._version: """Return a Connection for the named database."""
ver = ' (in version %s)' % `self._version` connection = self.connections.get(database_name)
else: if connection is None:
ver = '' new_con = self._db.databases[database_name].open()
return '<Connection at %08x%s>' % (positive_id(self), ver) self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def get(self, oid): def get(self, oid):
"""Return the persistent object with oid 'oid'. """Return the persistent object with oid 'oid'."""
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
:return: persistent object corresponding to `oid`
:Parameters:
- `oid`: an object id
:Exceptions:
- `KeyError`: if oid does not exist. It is possible that an
object does not exist as of the current transaction, but
existed in the past. It may even exist again in the
future, if the transaction that removed it is undone.
- `ConnectionStateError`: if the connection is closed.
"""
if self._storage is None: if self._storage is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
...@@ -347,33 +184,8 @@ class Connection(ExportImport, object): ...@@ -347,33 +184,8 @@ class Connection(ExportImport, object):
self._cache[oid] = obj self._cache[oid] = obj
return obj return obj
# deprecate this method?
__getitem__ = get
def add(self, obj): def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid. """Add a new object 'obj' to the database and assign it an oid."""
A persistent object is normally added to the database and
assigned an oid when it becomes reachable to an object already in
the database. In some cases, it is useful to create a new
object and use its oid (_p_oid) in a single transaction.
This method assigns a new oid regardless of whether the object
is reachable.
The object is added when the transaction commits. The object
must implement the IPersistent interface and must not
already be associated with a Connection.
:Parameters:
- `obj`: a Persistent object
:Exceptions:
- `TypeError`: if obj is not a persistent object.
- `InvalidObjectReference`: if obj is already associated
with another connection.
- `ConnectionStateError`: if the connection is closed.
"""
if self._storage is None: if self._storage is None:
raise ConnectionStateError("The database connection is closed") raise ConnectionStateError("The database connection is closed")
...@@ -397,72 +209,11 @@ class Connection(ExportImport, object): ...@@ -397,72 +209,11 @@ class Connection(ExportImport, object):
raise InvalidObjectReference(obj, obj._p_jar) raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self): def sortKey(self):
# If two connections use the same storage, give them a """Return a consistent sort key for this connection."""
# consistent order using id(). This is unique for the return "%s:%s" % (self._storage.sortKey(), id(self))
# lifetime of a connection, which is good enough.
return "%s:%s" % (self._sortKey(), id(self))
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
:Parameters:
- `odb`: database that owns the Connection
- `mvcc`: boolean indicating whether MVCC is enabled
- `txn_mgr`: transaction manager to use. None means
used the default transaction manager.
- `synch`: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self._sortKey = odb._storage.sortKey
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
def abort(self, transaction): def abort(self, transaction):
"""Abort modifications to registered objects. """Abort a transaction and forget all changes."""
This tells the cache to invalidate changed objects. _p_jar
and _p_oid are deleted from new objects.
"""
for obj in self._registered_objects: for obj in self._registered_objects:
oid = obj._p_oid oid = obj._p_oid
assert oid is not None assert oid is not None
...@@ -475,70 +226,22 @@ class Connection(ExportImport, object): ...@@ -475,70 +226,22 @@ class Connection(ExportImport, object):
self._tpc_cleanup() self._tpc_cleanup()
# Should there be a way to call incrgc directly? # TODO: we should test what happens when cacheGC is called mid-transaction.
# Perhaps "full sweep" should do that?
# TODO: we should test what happens when these methods are called
# mid-transaction.
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
:Parameters:
- `dt`: ignored. It is provided only for backwards compatibility.
"""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
def cacheGC(self): def cacheGC(self):
"""Reduce cache size to target size. """Reduce cache size to target size."""
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
"""
self._cache.incrgc() self._cache.incrgc()
__onCloseCallbacks = None __onCloseCallbacks = None
def onCloseCallback(self, f): def onCloseCallback(self, f):
"""Register a callable, f, to be called by close(). """Register a callable, f, to be called by close()."""
The callable, f, will be called at most once, the next time
the Connection is closed.
:Parameters:
- `f`: object that will be called on `close`
"""
if self.__onCloseCallbacks is None: if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = [] self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f) self.__onCloseCallbacks.append(f)
def close(self): def close(self):
"""Close the Connection. """Close the Connection."""
A closed Connection should not be used by client code. It
can't load or store objects. Objects in the cache are not
freed, because Connections are re-used and the cache are
expected to be useful to the next client.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is scanned for
old objects.
"""
if not self._needs_to_join: if not self._needs_to_join:
# We're currently joined to a transaction. # We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to " raise ConnectionStateError("Cannot close a connection joined to "
...@@ -575,7 +278,10 @@ class Connection(ExportImport, object): ...@@ -575,7 +278,10 @@ class Connection(ExportImport, object):
# assert that here, because self may have been reused (by # assert that here, because self may have been reused (by
# another thread) by the time we get back here. # another thread) by the time we get back here.
# transaction.interfaces.IDataManager
def commit(self, transaction): def commit(self, transaction):
"""Commit changes to an object"""
if self._import: if self._import:
# TODO: This code seems important for Zope, but needs docs # TODO: This code seems important for Zope, but needs docs
# to explain why. # to explain why.
...@@ -636,8 +342,20 @@ class Connection(ExportImport, object): ...@@ -636,8 +342,20 @@ class Connection(ExportImport, object):
self._modified.append(oid) self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj p = writer.serialize(obj) # This calls __getstate__ of obj
s = self._storage.store(oid, serial, p, self._version, transaction) if IBlob.providedBy(obj):
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Storing Blobs in %s is not supported." %
repr(self._storage))
s = self._storage.storeBlob(oid, serial, p,
obj._p_blob_uncommitted,
self._version, transaction)
else:
s = self._storage.store(oid, serial, p, self._version,
transaction)
self._store_count += 1 self._store_count += 1
# Put the object in the cache before handling the # Put the object in the cache before handling the
# response, just in case the response contains the # response, just in case the response contains the
# serial number for a newly created object # serial number for a newly created object
...@@ -652,9 +370,17 @@ class Connection(ExportImport, object): ...@@ -652,9 +370,17 @@ class Connection(ExportImport, object):
raise raise
self._handle_serial(s, oid) self._handle_serial(s, oid)
if IBlob.providedBy(obj):
# We need to update internals of the blobs here
obj._p_blob_uncommitted = None
obj._p_blob_data = \
self._storage.loadBlob(oid, obj._p_serial,
self._version )
def commit_sub(self, t): def commit_sub(self, t):
"""Commit all work done in all subtransactions for this transaction.""" """Commit all changes made in subtransactions and begin 2-phase commit
"""
if self._tmp is None: if self._tmp is None:
return return
src = self._storage src = self._storage
...@@ -671,11 +397,16 @@ class Connection(ExportImport, object): ...@@ -671,11 +397,16 @@ class Connection(ExportImport, object):
for oid in oids: for oid in oids:
data, serial = src.load(oid, src) data, serial = src.load(oid, src)
s = self._storage.store(oid, serial, data, self._version, t) blobfile = src.loadBlob(oid, serial, self._version)
if blobfile is not None:
s = self._storage.storeBlob(oid, serial, data, blobfile,
self._version, t)
else:
s = self._storage.store(oid, serial, data, self._version, t)
self._handle_serial(s, oid, change=False) self._handle_serial(s, oid, change=False)
def abort_sub(self, t): def abort_sub(self, t):
"""Abort work done in all subtransactions for this transaction.""" """Discard all subtransaction data."""
if self._tmp is None: if self._tmp is None:
return return
src = self._storage src = self._storage
...@@ -686,7 +417,7 @@ class Connection(ExportImport, object): ...@@ -686,7 +417,7 @@ class Connection(ExportImport, object):
self._invalidate_creating(src._creating) self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None): def _invalidate_creating(self, creating=None):
"""Dissown any objects newly saved in an uncommitted transaction.""" """Disown any objects newly saved in an uncommitted transaction."""
if creating is None: if creating is None:
creating = self._creating creating = self._creating
self._creating = [] self._creating = []
...@@ -698,42 +429,6 @@ class Connection(ExportImport, object): ...@@ -698,42 +429,6 @@ class Connection(ExportImport, object):
del o._p_jar del o._p_jar
del o._p_oid del o._p_oid
def db(self):
return self._db
def getVersion(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def isReadOnly(self):
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects is accessed by
the current transaction, the revision written before C{tid}
will be used.
The DB calls this method, even when the Connection is closed.
:Parameters:
- `tid`: the storage-level id of the transaction that committed
- `oids`: oids is a set of oids, represented as a dict with oids
as keys.
"""
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
# The next two methods are callbacks for transaction synchronization. # The next two methods are callbacks for transaction synchronization.
def beforeCompletion(self, txn): def beforeCompletion(self, txn):
...@@ -754,213 +449,35 @@ class Connection(ExportImport, object): ...@@ -754,213 +449,35 @@ class Connection(ExportImport, object):
# Now is a good time to collect some garbage # Now is a good time to collect some garbage
self._cache.incrgc() self._cache.incrgc()
def modifiedInVersion(self, oid):
try:
return self._db.modifiedInVersion(oid)
except KeyError:
return self._version
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
def root(self): def root(self):
"""Return the database root object. """Return the database root object."""
The root is a persistent.mapping.PersistentMapping.
"""
return self.get(z64) return self.get(z64)
def setstate(self, obj): def db(self):
oid = obj._p_oid """Returns a handle to the database this connection belongs to."""
return self._db
def isReadOnly(self):
"""Returns True if the storage for this connection is read only."""
if self._storage is None: if self._storage is None:
msg = ("Shouldn't load state for %s " raise ConnectionStateError("The database connection is closed")
"when the connection is closed" % oid_repr(oid)) return self._storage.isReadOnly()
self._log.error(msg)
raise ConnectionStateError(msg)
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
self._inv_lock.acquire()
try: try:
self._setstate(obj) if self._txn_time is None:
except ConflictError: self._txn_time = tid
raise self._invalidated.update(oids)
except: finally:
self._log.error("Couldn't load state for %s", oid_repr(oid), self._inv_lock.release()
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an # IDataManager
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases def tpc_begin(self, transaction, sub=False):
# to consider: """Begin commit of a transaction, starting the two-phase commit."""
# 1. Check _p_independent() self._modified = []
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
"""Return copy of obj that was written by tid.
The returned object does not have the typical metadata
(_p_jar, _p_oid, _p_serial) set. I'm not sure how references
to other peristent objects are handled.
:return: a persistent object
:Parameters:
- `obj`: a persistent object from this Connection.
- `tid`: id of a transaction that wrote an earlier revision.
:Exceptions:
- `KeyError`: if tid does not exist or if tid deleted a revision
of obj.
"""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def tpc_begin(self, transaction, sub=False):
self._modified = []
# _creating is a list of oids of new objects, which is used to # _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts. # remove them from the cache if a transaction aborts.
...@@ -973,6 +490,7 @@ class Connection(ExportImport, object): ...@@ -973,6 +490,7 @@ class Connection(ExportImport, object):
self._storage.tpc_begin(transaction) self._storage.tpc_begin(transaction)
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
try: try:
vote = self._storage.tpc_vote vote = self._storage.tpc_vote
except AttributeError: except AttributeError:
...@@ -1023,12 +541,7 @@ class Connection(ExportImport, object): ...@@ -1023,12 +541,7 @@ class Connection(ExportImport, object):
obj._p_serial = serial obj._p_serial = serial
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
# It's important that the storage calls the function we pass """Indicate confirmation that the transaction is done."""
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
if self._tmp is not None: if self._tmp is not None:
# Commiting a subtransaction! # Commiting a subtransaction!
# There is no need to invalidate anything. # There is no need to invalidate anything.
...@@ -1045,6 +558,7 @@ class Connection(ExportImport, object): ...@@ -1045,6 +558,7 @@ class Connection(ExportImport, object):
self._tpc_cleanup() self._tpc_cleanup()
def tpc_abort(self, transaction): def tpc_abort(self, transaction):
"""Abort a transaction."""
if self._import: if self._import:
self._import = None self._import = None
self._storage.tpc_abort(transaction) self._storage.tpc_abort(transaction)
...@@ -1056,16 +570,16 @@ class Connection(ExportImport, object): ...@@ -1056,16 +570,16 @@ class Connection(ExportImport, object):
del obj._p_jar del obj._p_jar
self._tpc_cleanup() self._tpc_cleanup()
# Common cleanup actions after tpc_finish/tpc_abort.
def _tpc_cleanup(self): def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear() self._conflicts.clear()
if not self._synch: if not self._synch:
self._flush_invalidations() self._flush_invalidations()
self._needs_to_join = True self._needs_to_join = True
self._registered_objects = [] self._registered_objects = []
def sync(self): def sync(self):
"""Manually update the view on the database."""
self._txn_mgr.get().abort() self._txn_mgr.get().abort()
sync = getattr(self._storage, 'sync', 0) sync = getattr(self._storage, 'sync', 0)
if sync: if sync:
...@@ -1073,22 +587,311 @@ class Connection(ExportImport, object): ...@@ -1073,22 +587,311 @@ class Connection(ExportImport, object):
self._flush_invalidations() self._flush_invalidations()
def getDebugInfo(self): def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info return self._debug_info
def setDebugInfo(self, *args): def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args self._debug_info = self._debug_info + args
def getTransferCounts(self, clear=False): def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored. """Returns the number of objects loaded and stored."""
If clear is True, reset the counters.
"""
res = self._load_count, self._store_count res = self._load_count, self._store_count
if clear: if clear:
self._load_count = 0 self._load_count = 0
self._store_count = 0 self._store_count = 0
return res return res
##############################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'."""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setstate(self, obj):
"""Turns the ghost 'obj' into a real object by loading it's from the
database."""
oid = obj._p_oid
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
self._log.error(msg)
raise ConnectionStateError(msg)
try:
self._setstate(obj)
except ConflictError:
raise
except:
self._log.error("Couldn't load state for %s", oid_repr(oid),
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if (obj._p_oid in self._invalidated
and not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
# Blob support
if IBlob.providedBy(obj):
obj._p_blob_data = \
self._storage.loadBlob(oid, serial, self._version)
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
del self._invalidated[obj._p_oid]
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# There is some old Zope code that assigns _p_jar
# directly. That is no longer allowed, but we need to
# provide support for old code that still does it.
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing warning.
deprecated36("Assigning to _p_jar is deprecated, and will be "
"changed to raise an exception.")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
if obj is not None:
self._registered_objects.append(obj)
if self._needs_to_join:
self._txn_mgr.get().join(self)
self._needs_to_join = False
# PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def _setDB(self, odb, mvcc=None, txn_mgr=None, synch=None):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
Parameters:
odb: database that owns the Connection
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means
used the default transaction manager.
synch: boolean indicating whether Connection should
register for afterCompletion() calls.
"""
# TODO: Why do we go to all the trouble of setting _db and
# other attributes on open and clearing them on close?
# A Connection is only ever associated with a single DB
# and Storage.
self._db = odb
self._storage = odb._storage
self.new_oid = odb._storage.new_oid
self._opened = time()
if synch is not None:
self._synch = synch
if mvcc is not None:
self._mvcc = mvcc
self._txn_mgr = txn_mgr or transaction.manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
if self._synch:
self._txn_mgr.registerSynch(self)
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
# Multi-database support
self.connections = {self._db.database_name: self}
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
# Python protocol
def __repr__(self):
if self._version:
ver = ' (in version %s)' % `self._version`
else:
ver = ''
return '<Connection at %08x%s>' % (positive_id(self), ver)
# DEPRECATION candidates
__getitem__ = get
def modifiedInVersion(self, oid):
"""Returns the version the object with the given oid was modified in.
If it wasn't modified in a version, the current version of this
connection is returned.
"""
try:
return self._db.modifiedInVersion(oid)
except KeyError:
return self.getVersion()
def getVersion(self):
"""Returns the version this connection is attached to."""
if self._storage is None:
raise ConnectionStateError("The database connection is closed")
return self._version
def setklassstate(self, obj):
# Special case code to handle ZClasses, I think.
# Called the cache when an object of type type is invalidated.
try:
oid = obj._p_oid
p, serial = self._storage.load(oid, self._version)
# We call getGhost(), but we actually get a non-ghost back.
# The object is a class, which can't actually be ghosted.
copy = self._reader.getGhost(p)
obj.__dict__.clear()
obj.__dict__.update(copy.__dict__)
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = 0
obj._p_serial = serial
except:
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def exchange(self, old, new): def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite # called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid oid = old._p_oid
...@@ -1097,3 +900,52 @@ class Connection(ExportImport, object): ...@@ -1097,3 +900,52 @@ class Connection(ExportImport, object):
new._p_changed = 1 new._p_changed = 1
self._register(new) self._register(new)
self._cache[oid] = new self._cache[oid] = new
# DEPRECATED methods
def getTransaction(self):
"""Get the current transaction for this connection.
:deprecated:
The transaction manager's get method works the same as this
method. You can pass a transaction manager (TM) to DB.open()
to control which TM the Connection uses.
"""
deprecated36("getTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
return self._txn_mgr.get()
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread.
:deprecated:
Returns the transaction manager used by the connection. You
can pass a transaction manager (TM) to DB.open() to control
which TM the Connection uses.
"""
deprecated36("setLocalTransaction() is deprecated. "
"Use the txn_mgr argument to DB.open() instead.")
if self._txn_mgr is transaction.manager:
if self._synch:
self._txn_mgr.unregisterSynch(self)
self._txn_mgr = transaction.TransactionManager()
if self._synch:
self._txn_mgr.registerSynch(self)
return self._txn_mgr
def cacheFullSweep(self, dt=None):
deprecated36("cacheFullSweep is deprecated. "
"Use cacheMinimize instead.")
if dt is None:
self._cache.full_sweep()
else:
self._cache.full_sweep(dt)
def cacheMinimize(self, dt=DEPRECATED_ARGUMENT):
"""Deactivate all unmodified objects in the cache."""
if dt is not DEPRECATED_ARGUMENT:
deprecated36("cacheMinimize() dt= is ignored.")
self._cache.minimize()
...@@ -27,6 +27,9 @@ from ZODB.serialize import referencesf ...@@ -27,6 +27,9 @@ from ZODB.serialize import referencesf
from ZODB.utils import WeakSet from ZODB.utils import WeakSet
from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36 from ZODB.utils import DEPRECATED_ARGUMENT, deprecated36
from zope.interface import implements
from ZODB.interfaces import IDatabase
import transaction import transaction
logger = logging.getLogger('ZODB.DB') logger = logging.getLogger('ZODB.DB')
...@@ -178,6 +181,7 @@ class DB(object): ...@@ -178,6 +181,7 @@ class DB(object):
setCacheDeactivateAfter, setCacheDeactivateAfter,
getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter getVersionCacheDeactivateAfter, setVersionCacheDeactivateAfter
""" """
implements(IDatabase)
klass = Connection # Class to use for connections klass = Connection # Class to use for connections
_activity_monitor = None _activity_monitor = None
...@@ -188,6 +192,8 @@ class DB(object): ...@@ -188,6 +192,8 @@ class DB(object):
cache_deactivate_after=DEPRECATED_ARGUMENT, cache_deactivate_after=DEPRECATED_ARGUMENT,
version_pool_size=3, version_pool_size=3,
version_cache_size=100, version_cache_size=100,
database_name='unnamed',
databases=None,
version_cache_deactivate_after=DEPRECATED_ARGUMENT, version_cache_deactivate_after=DEPRECATED_ARGUMENT,
): ):
"""Create an object database. """Create an object database.
...@@ -248,6 +254,16 @@ class DB(object): ...@@ -248,6 +254,16 @@ class DB(object):
storage.tpc_vote(t) storage.tpc_vote(t)
storage.tpc_finish(t) storage.tpc_finish(t)
# Multi-database setup.
if databases is None:
databases = {}
self.databases = databases
self.database_name = database_name
if database_name in databases:
raise ValueError("database_name %r already in databases" %
database_name)
databases[database_name] = self
# Pass through methods: # Pass through methods:
for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog', for m in ['history', 'supportsUndo', 'supportsVersions', 'undoLog',
'versionEmpty', 'versions']: 'versionEmpty', 'versions']:
...@@ -565,7 +581,7 @@ class DB(object): ...@@ -565,7 +581,7 @@ class DB(object):
def get_info(c): def get_info(c):
# `result`, `time` and `version` are lexically inherited. # `result`, `time` and `version` are lexically inherited.
o = c._opened o = c._opened
d = c._debug_info d = c.getDebugInfo()
if d: if d:
if len(d) == 1: if len(d) == 1:
d = d[0] d = d[0]
......
...@@ -547,6 +547,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -547,6 +547,7 @@ class FileStorage(BaseStorage.BaseStorage,
self._lock_release() self._lock_release()
def load(self, oid, version): def load(self, oid, version):
"""Return pickle data and serial number."""
self._lock_acquire() self._lock_acquire()
try: try:
pos = self._lookup_pos(oid) pos = self._lookup_pos(oid)
...@@ -629,7 +630,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -629,7 +630,7 @@ class FileStorage(BaseStorage.BaseStorage,
finally: finally:
self._lock_release() self._lock_release()
def store(self, oid, serial, data, version, transaction): def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only: if self._is_read_only:
raise POSException.ReadOnlyError() raise POSException.ReadOnlyError()
if transaction is not self._transaction: if transaction is not self._transaction:
...@@ -652,12 +653,12 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -652,12 +653,12 @@ class FileStorage(BaseStorage.BaseStorage,
pnv = h.pnv pnv = h.pnv
cached_tid = h.tid cached_tid = h.tid
if serial != cached_tid: if oldserial != cached_tid:
rdata = self.tryToResolveConflict(oid, cached_tid, rdata = self.tryToResolveConflict(oid, cached_tid,
serial, data) oldserial, data)
if rdata is None: if rdata is None:
raise POSException.ConflictError( raise POSException.ConflictError(
oid=oid, serials=(cached_tid, serial), data=data) oid=oid, serials=(cached_tid, oldserial), data=data)
else: else:
data = rdata data = rdata
...@@ -687,7 +688,7 @@ class FileStorage(BaseStorage.BaseStorage, ...@@ -687,7 +688,7 @@ class FileStorage(BaseStorage.BaseStorage,
raise FileStorageQuotaError( raise FileStorageQuotaError(
"The storage quota has been exceeded.") "The storage quota has been exceeded.")
if old and serial != cached_tid: if old and oldserial != cached_tid:
return ConflictResolution.ResolvedSerial return ConflictResolution.ResolvedSerial
else: else:
return self._tid return self._tid
......
...@@ -68,16 +68,16 @@ ...@@ -68,16 +68,16 @@
# #
# - 8-byte data length # - 8-byte data length
# #
# ? 8-byte position of non-version data # ? 8-byte position of non-version data record
# (if version length > 0) # (if version length > 0)
# #
# ? 8-byte position of previous record in this version # ? 8-byte position of previous record in this version
# (if version length > 0) # (if version length > 0)
# #
# ? version string # ? version string
# (if version length > 0) # (if version length > 0)
# #
# ? data # ? data
# (data length > 0) # (data length > 0)
# #
# ? 8-byte position of data record containing data # ? 8-byte position of data record containing data
......
...@@ -12,8 +12,11 @@ ...@@ -12,8 +12,11 @@
# #
############################################################################## ##############################################################################
from zope.interface import implements
from ZODB.Blobs.interfaces import IBlobStorage
from ZODB import POSException from ZODB import POSException
from ZODB.utils import p64, u64, z64 from ZODB.utils import p64, u64, z64, cp
import tempfile import tempfile
...@@ -22,6 +25,8 @@ class TmpStore: ...@@ -22,6 +25,8 @@ class TmpStore:
_bver = '' _bver = ''
implements(IBlobStorage)
def __init__(self, base_version, storage): def __init__(self, base_version, storage):
self._transaction = None self._transaction = None
self._storage = storage self._storage = storage
...@@ -37,6 +42,8 @@ class TmpStore: ...@@ -37,6 +42,8 @@ class TmpStore:
self._tindex = {} self._tindex = {}
self._creating = [] self._creating = []
self.blob_files = {}
def close(self): def close(self):
self._file.close() self._file.close()
...@@ -61,6 +68,9 @@ class TmpStore: ...@@ -61,6 +68,9 @@ class TmpStore:
serial = h[:8] serial = h[:8]
return self._file.read(size), serial return self._file.read(size), serial
def sortKey(self):
return self._storage.sortKey()
# TODO: clarify difference between self._storage & self._db._storage # TODO: clarify difference between self._storage & self._db._storage
def modifiedInVersion(self, oid): def modifiedInVersion(self, oid):
...@@ -119,5 +129,27 @@ class TmpStore: ...@@ -119,5 +129,27 @@ class TmpStore:
def versionEmpty(self, version): def versionEmpty(self, version):
# TODO: what is this supposed to do? # TODO: what is this supposed to do?
# NOTE: This appears to implement the opposite of what it should.
if version == self._bver: if version == self._bver:
return len(self._index) return len(self._index)
# Blob support
def loadBlob(self, oid, serial, version):
return self.blob_files.get(oid)
def storeBlob(self, oid, oldserial, data, blobfile, version, transaction):
result = self.store(oid, oldserial, data, version, transaction)
target = file(self.generateBlobFile(oid), "w")
src = file(blobfile, "r")
cp(src, target)
return result
def generateBlobFile(self, oid):
if not self.blob_files.has_key(oid):
handle, name = tempfile.mkstemp()
handle.close()
self.blob_files[oid] = name
return self.blob_files[oid]
...@@ -158,4 +158,15 @@ ...@@ -158,4 +158,15 @@
<key name="version-cache-size" datatype="integer" default="100"/> <key name="version-cache-size" datatype="integer" default="100"/>
</sectiontype> </sectiontype>
<sectiontype name="blobfilestorage" datatype=".BlobFileStorage"
implements="ZODB.storage" extends="filestorage">
<key name="blob-dir" required="yes">
<description>
Path name to the blob storage directory.
</description>
</key>
</sectiontype>
</component> </component>
...@@ -132,6 +132,15 @@ class FileStorage(BaseConfig): ...@@ -132,6 +132,15 @@ class FileStorage(BaseConfig):
read_only=self.config.read_only, read_only=self.config.read_only,
quota=self.config.quota) quota=self.config.quota)
class BlobFileStorage(FileStorage):
def open(self):
from ZODB.Blobs.BlobStorage import BlobStorage
base_storage = FileStorage.open(self)
return BlobStorage(self.config.blob_dir, base_storage)
class ZEOClient(BaseConfig): class ZEOClient(BaseConfig):
def open(self): def open(self):
......
...@@ -16,14 +16,122 @@ ...@@ -16,14 +16,122 @@
$Id$ $Id$
""" """
import zope.interface from zope.interface import Interface, Attribute
class IConnection(zope.interface.Interface): class IConnection(Interface):
"""ZODB connection. """Connection to ZODB for loading and storing objects.
TODO: This interface is incomplete. The Connection object serves as a data manager. The root() method
on a Connection returns the root object for the database. This
object and all objects reachable from it are associated with the
Connection that loaded them. When a transaction commits, it uses
the Connection to store modified objects.
Typical use of ZODB is for each thread to have its own
Connection and that no thread should have more than one Connection
to the same database. A thread is associated with a Connection by
loading objects from that Connection. Objects loaded by one
thread should not be used by another thread.
A Connection can be associated with a single version when it is
created. By default, a Connection is not associated with a
version; it uses non-version data.
Each Connection provides an isolated, consistent view of the
database, by managing independent copies of objects in the
database. At transaction boundaries, these copies are updated to
reflect the current state of the database.
You should not instantiate this class directly; instead call the
open() method of a DB instance.
In many applications, root() is the only method of the Connection
that you will need to use.
Synchronization
---------------
A Connection instance is not thread-safe. It is designed to
support a thread model where each thread has its own transaction.
If an application has more than one thread that uses the
connection or the transaction the connection is registered with,
the application should provide locking.
The Connection manages movement of objects in and out of object
storage.
TODO: We should document an intended API for using a Connection via
multiple threads.
TODO: We should explain that the Connection has a cache and that
multiple calls to get() will return a reference to the same
object, provided that one of the earlier objects is still
referenced. Object identity is preserved within a connection, but
not across connections.
TODO: Mention the database pool.
A database connection always presents a consistent view of the
objects in the database, although it may not always present the
most current revision of any particular object. Modifications
made by concurrent transactions are not visible until the next
transaction boundary (abort or commit).
Two options affect consistency. By default, the mvcc and synch
options are enabled by default.
If you pass mvcc=True to db.open(), the Connection will never read
non-current revisions of an object. Instead it will raise a
ReadConflictError to indicate that the current revision is
unavailable because it was written after the current transaction
began.
The logic for handling modifications assumes that the thread that
opened a Connection (called db.open()) is the thread that will use
the Connection. If this is not true, you should pass synch=False
to db.open(). When the synch option is disabled, some transaction
boundaries will be missed by the Connection; in particular, if a
transaction does not involve any modifications to objects loaded
from the Connection and synch is disabled, the Connection will
miss the transaction boundary. Two examples of this behavior are
db.undo() and read-only transactions.
Groups of methods:
User Methods:
root, get, add, close, db, sync, isReadOnly, cacheGC, cacheFullSweep,
cacheMinimize, getVersion, modifiedInVersion
Experimental Methods:
onCloseCallbacks
Database Invalidation Methods:
invalidate
Other Methods: exchange, getDebugInfo, setDebugInfo,
getTransferCounts
""" """
def __init__(version='', cache_size=400,
cache_deactivate_after=None, mvcc=True, txn_mgr=None,
synch=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
instance that it is connected to.
Parameters:
version: the "version" that all changes will be made in, defaults
to no version.
cache_size: the target size of the in-memory object cache, measured
in objects.
mvcc: boolean indicating whether MVCC is enabled
txn_mgr: transaction manager to use. None means used the default
transaction manager.
synch: boolean indicating whether Connection should register for
afterCompletion() calls.
"""
def add(ob): def add(ob):
"""Add a new object 'obj' to the database and assign it an oid. """Add a new object 'obj' to the database and assign it an oid.
...@@ -38,15 +146,330 @@ class IConnection(zope.interface.Interface): ...@@ -38,15 +146,330 @@ class IConnection(zope.interface.Interface):
The object is added when the transaction commits. The object The object is added when the transaction commits. The object
must implement the IPersistent interface and must not must implement the IPersistent interface and must not
already be associated with a Connection. already be associated with a Connection.
Parameters:
obj: a Persistent object
Raises TypeError if obj is not a persistent object.
Raises InvalidObjectReference if obj is already associated with another
connection.
Raises ConnectionStateError if the connection is closed.
"""
def get(oid):
"""Return the persistent object with oid 'oid'.
If the object was not in the cache and the object's class is
ghostable, then a ghost will be returned. If the object is
already in the cache, a reference to the cached object will be
returned.
Applications seldom need to call this method, because objects
are loaded transparently during attribute lookup.
Parameters:
oid: an object id
Raises KeyError if oid does not exist.
It is possible that an object does not exist as of the current
transaction, but existed in the past. It may even exist again in
the future, if the transaction that removed it is undone.
Raises ConnectionStateError if the connection is closed.
"""
def cacheMinimize():
"""Deactivate all unmodified objects in the cache.
Call _p_deactivate() on each cached object, attempting to turn
it into a ghost. It is possible for individual objects to
remain active.
"""
def cacheGC():
"""Reduce cache size to target size.
Call _p_deactivate() on cached objects until the cache size
falls under the target size.
""" """
class IBlobStorage(zope.interface.Interface): def onCloseCallback(f):
"""A storage supporting BLOBs.""" """Register a callable, f, to be called by close().
def storeBlob(oid, serial, data, blob, version, transaction): f will be called with no arguments before the Connection is closed.
"""Stores data that has a BLOB attached."""
def loadBlob(oid, serial, version, blob): Parameters:
"""Loads the BLOB data for 'oid' into the given blob object. f: method that will be called on `close`
""" """
def close():
"""Close the Connection.
When the Connection is closed, all callbacks registered by
onCloseCallback() are invoked and the cache is garbage collected.
A closed Connection should not be used by client code. It can't load
or store objects. Objects in the cache are not freed, because
Connections are re-used and the cache is expected to be useful to the
next client.
"""
def db():
"""Returns a handle to the database this connection belongs to."""
def isReadOnly():
"""Returns True if the storage for this connection is read only."""
def invalidate(tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids.
When the next transaction boundary is reached, objects will be
invalidated. If any of the invalidated objects are accessed by the
current transaction, the revision written before Connection.tid will be
used.
The DB calls this method, even when the Connection is closed.
Parameters:
tid: the storage-level id of the transaction that committed
oids: oids is a set of oids, represented as a dict with oids as keys.
"""
def root():
"""Return the database root object.
The root is a persistent.mapping.PersistentMapping.
"""
def getVersion():
"""Returns the version this connection is attached to."""
# Multi-database support.
connections = Attribute("""\
A mapping from database name to a Connection to that database.
In multi-database use, the Connections of all members of a database
collection share the same .connections object.
In single-database use, of course this mapping contains a single
entry.
""")
# TODO: should this accept all the arguments one may pass to DB.open()?
def get_connection(database_name):
"""Return a Connection for the named database.
This is intended to be called from an open Connection associated with
a multi-database. In that case, database_name must be the name of a
database within the database collection (probably the name of a
different database than is associated with the calling Connection
instance, but it's fine to use the name of the calling Connection
object's database). A Connection for the named database is
returned. If no connection to that database is already open, a new
Connection is opened. So long as the multi-database remains open,
passing the same name to get_connection() multiple times returns the
same Connection object each time.
"""
def sync():
"""Manually update the view on the database.
This includes aborting the current transaction, getting a fresh and
consistent view of the data (synchronizing with the storage if possible)
and call cacheGC() for this connection.
This method was especially useful in ZODB 3.2 to better support
read-only connections that were affected by a couple of problems.
"""
# Debug information
def getDebugInfo():
"""Returns a tuple with different items for debugging the connection.
Debug information can be added to a connection by using setDebugInfo.
"""
def setDebugInfo(*items):
"""Add the given items to the debug information of this connection."""
def getTransferCounts(clear=False):
"""Returns the number of objects loaded and stored.
If clear is True, reset the counters.
"""
class IDatabase(Interface):
"""ZODB DB.
TODO: This interface is incomplete.
"""
def __init__(storage,
pool_size=7,
cache_size=400,
version_pool_size=3,
version_cache_size=100,
database_name='unnamed',
databases=None,
):
"""Create an object database.
storage: the storage used by the database, e.g. FileStorage
pool_size: expected maximum number of open connections
cache_size: target size of Connection object cache, in number of
objects
version_pool_size: expected maximum number of connections (per
version)
version_cache_size: target size of Connection object cache for
version connections, in number of objects
database_name: when using a multi-database, the name of this DB
within the database group. It's a (detected) error if databases
is specified too and database_name is already a key in it.
This becomes the value of the DB's database_name attribute.
databases: when using a multi-database, a mapping to use as the
binding of this DB's .databases attribute. It's intended
that the second and following DB's added to a multi-database
pass the .databases attribute set on the first DB added to the
collection.
"""
databases = Attribute("""\
A mapping from database name to DB (database) object.
In multi-database use, all DB members of a database collection share
the same .databases object.
In single-database use, of course this mapping contains a single
entry.
""")
class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
def load(oid, version):
"""XXX"""
def close():
"""XXX"""
def cleanup():
"""XXX"""
def lastSerial():
"""XXX"""
def lastTransaction():
"""XXX"""
def lastTid(oid):
"""Return last serialno committed for object oid."""
def loadSerial(oid, serial):
"""XXX"""
def loadBefore(oid, tid):
"""XXX"""
def iterator(start=None, stop=None):
"""XXX"""
def sortKey():
"""XXX"""
def getName():
"""XXX"""
def getSize():
"""XXX"""
def history(oid, version, length=1, filter=None):
"""XXX"""
def new_oid(last=None):
"""XXX"""
def set_max_oid(possible_new_max_oid):
"""XXX"""
def registerDB(db, limit):
"""XXX"""
def isReadOnly():
"""XXX"""
def supportsUndo():
"""XXX"""
def supportsVersions():
"""XXX"""
def tpc_abort(transaction):
"""XXX"""
def tpc_begin(transaction):
"""XXX"""
def tpc_vote(transaction):
"""XXX"""
def tpc_finish(transaction, f=None):
"""XXX"""
def getSerial(oid):
"""XXX"""
def loadSerial(oid, serial):
"""XXX"""
def loadBefore(oid, tid):
"""XXX"""
def getExtensionMethods():
"""XXX"""
def copyTransactionsFrom():
"""XXX"""
def store(oid, oldserial, data, version, transaction):
"""
may return the new serial or not
"""
class IUndoableStorage(IStorage):
def undo(transaction_id, txn):
"""XXX"""
def undoInfo():
"""XXX"""
def undoLog(first, last, filter=None):
"""XXX"""
def pack(t, referencesf):
"""XXX"""
class IVersioningStorage(IStorage):
def abortVersion(src, transaction):
"""XXX"""
def commitVersion(src, dest, transaction):
"""XXX"""
def modifiedInVersion(oid):
"""XXX"""
def versionEmpty(version):
"""XXX"""
def versions(max=None):
"""XXX"""
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Support for testing logging code
If you want to test that your code generates proper log output, you
can create and install a handler that collects output:
>>> handler = InstalledHandler('foo.bar')
The handler is installed into loggers for all of the names passed. In
addition, the logger level is set to 1, which means, log
everything. If you want to log less than everything, you can provide a
level keyword argument. The level setting effects only the named
loggers.
Then, any log output is collected in the handler:
>>> logging.getLogger('foo.bar').exception('eek')
>>> logging.getLogger('foo.bar').info('blah blah')
>>> for record in handler.records:
... print record.name, record.levelname
... print ' ', record.getMessage()
foo.bar ERROR
eek
foo.bar INFO
blah blah
A similar effect can be gotten by just printing the handler:
>>> print handler
foo.bar ERROR
eek
foo.bar INFO
blah blah
After checking the log output, you need to uninstall the handler:
>>> handler.uninstall()
At which point, the handler won't get any more log output.
Let's clear the handler:
>>> handler.clear()
>>> handler.records
[]
And then log something:
>>> logging.getLogger('foo.bar').info('blah')
and, sure enough, we still have no output:
>>> handler.records
[]
$Id: loggingsupport.py 28349 2004-11-06 00:10:32Z tim_one $
"""
import logging
class Handler(logging.Handler):
def __init__(self, *names, **kw):
logging.Handler.__init__(self)
self.names = names
self.records = []
self.setLoggerLevel(**kw)
def setLoggerLevel(self, level=1):
self.level = level
self.oldlevels = {}
def emit(self, record):
self.records.append(record)
def clear(self):
del self.records[:]
def install(self):
for name in self.names:
logger = logging.getLogger(name)
self.oldlevels[name] = logger.level
logger.setLevel(self.level)
logger.addHandler(self)
def uninstall(self):
for name in self.names:
logger = logging.getLogger(name)
logger.setLevel(self.oldlevels[name])
logger.removeHandler(self)
def __str__(self):
return '\n'.join(
[("%s %s\n %s" %
(record.name, record.levelname,
'\n'.join([line
for line in record.getMessage().split('\n')
if line.strip()])
)
)
for record in self.records]
)
class InstalledHandler(Handler):
def __init__(self, *names):
Handler.__init__(self, *names)
self.install()
##############################################################################
#
# Copyright (c) 2005 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
Multi-database tests
====================
Multi-database support adds the ability to tie multiple databases into a
collection. The original proposal is in the fishbowl:
http://www.zope.org/Wikis/ZODB/MultiDatabases/
It was implemented during the PyCon 2005 sprints, but in a simpler form,
by Jim Fulton, Christian Theune,and Tim Peters. Overview:
No private attributes were added, and one new method was introduced.
DB:
- a new .database_name attribute holds the name of this database
- a new .databases attribute maps from database name to DB object; all DBs
in a multi-database collection share the same .databases object
- the DB constructor has new optional arguments with the same names
(database_name= and databases=).
Connection:
- a new .connections attribute maps from database name to a Connection for
the database with that name; the .connections mapping object is also
shared among databases in a collection
- a new .get_connection(database_name) method returns a Connection for a
database in the collection; if a connection is already open, it's returned
(this is the value .connections[database_name]), else a new connection is
opened (and stored as .connections[database_name])
Creating a multi-database starts with creating a named DB:
>>> from ZODB.tests.test_storage import MinimalMemoryStorage
>>> from ZODB import DB
>>> dbmap = {}
>>> db = DB(MinimalMemoryStorage(), database_name='root', databases=dbmap)
The database name is accessible afterwards and in a newly created collection:
>>> db.database_name
'root'
>>> db.databases # doctest: +ELLIPSIS
{'root': <ZODB.DB.DB object at ...>}
>>> db.databases is dbmap
True
Adding another database to the collection works like this:
>>> db2 = DB(MinimalMemoryStorage(),
... database_name='notroot',
... databases=dbmap)
The new db2 now shares the 'databases' dictionary with db and has two entries:
>>> db2.databases is db.databases is dbmap
True
>>> len(db2.databases)
2
>>> names = dbmap.keys(); names.sort(); print names
['notroot', 'root']
It's an error to try to insert a database with a name already in use:
>>> db3 = DB(MinimalMemoryStorage(),
... database_name='root',
... databases=dbmap)
Traceback (most recent call last):
...
ValueError: database_name 'root' already in databases
Because that failed, db.databases wasn't changed:
>>> len(db.databases) # still 2
2
You can (still) get a connection to a database this way:
>>> cn = db.open()
>>> cn # doctest: +ELLIPSIS
<Connection at ...>
This is the only connection in this collection right now:
>>> cn.connections # doctest: +ELLIPSIS
{'root': <Connection at ...>}
Getting a connection to a different database from an existing connection in the
same database collection (this enables 'connection binding' within a given
thread/transaction/context ...):
>>> cn2 = cn.get_connection('notroot')
>>> cn2 # doctest: +ELLIPSIS
<Connection at ...>
Now there are two connections in that collection:
>>> cn2.connections is cn.connections
True
>>> len(cn2.connections)
2
>>> names = cn.connections.keys(); names.sort(); print names
['notroot', 'root']
So long as this database group remains open, the same Connection objects
are returned:
>>> cn.get_connection('root') is cn
True
>>> cn.get_connection('notroot') is cn2
True
>>> cn2.get_connection('root') is cn
True
>>> cn2.get_connection('notroot') is cn2
True
Of course trying to get a connection for a database not in the group raises
an exception:
>>> cn.get_connection('no way')
Traceback (most recent call last):
...
KeyError: 'no way'
Clean up:
>>> for a_db in dbmap.values():
... a_db.close()
...@@ -647,6 +647,8 @@ class StubDatabase: ...@@ -647,6 +647,8 @@ class StubDatabase:
self._storage = StubStorage() self._storage = StubStorage()
classFactory = None classFactory = None
database_name = 'stubdatabase'
databases = {'stubdatabase': database_name}
def invalidate(self, transaction, dict_with_oid_keys, connection): def invalidate(self, transaction, dict_with_oid_keys, connection):
pass pass
......
...@@ -15,4 +15,6 @@ ...@@ -15,4 +15,6 @@
from zope.testing.doctestunit import DocFileSuite from zope.testing.doctestunit import DocFileSuite
def test_suite(): def test_suite():
return DocFileSuite("dbopen.txt") return DocFileSuite("dbopen.txt",
"multidb.txt",
)
...@@ -21,6 +21,8 @@ import cPickle as pickle ...@@ -21,6 +21,8 @@ import cPickle as pickle
from cStringIO import StringIO from cStringIO import StringIO
import weakref import weakref
import warnings import warnings
from tempfile import mkstemp
import os
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
...@@ -305,3 +307,10 @@ class WeakSet(object): ...@@ -305,3 +307,10 @@ class WeakSet(object):
# We're cheating by breaking into the internals of Python's # We're cheating by breaking into the internals of Python's
# WeakValueDictionary here (accessing its .data attribute). # WeakValueDictionary here (accessing its .data attribute).
return self.data.data.values() return self.data.data.values()
def mktemp():
"""Create a temp file, known by name, in a semi-secure manner."""
handle, filename = mkstemp()
os.close(handle)
return filename
...@@ -257,18 +257,35 @@ class IPersistentDataManager(Interface): ...@@ -257,18 +257,35 @@ class IPersistentDataManager(Interface):
def setstate(object): def setstate(object):
"""Load the state for the given object. """Load the state for the given object.
The object should be in the ghost state. The object should be in the ghost state. The object's state will be
The object's state will be set and the object will end up set and the object will end up in the saved state.
in the saved state.
The object must provide the IPersistent interface. The object must provide the IPersistent interface.
""" """
def oldstate(obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'.
The returned object does not have the typical metadata (_p_jar, _p_oid,
_p_serial) set. I'm not sure how references to other peristent objects
are handled.
Parameters
obj: a persistent object from this Connection.
tid: id of a transaction that wrote an earlier revision.
Raises KeyError if tid does not exist or if tid deleted a revision of
obj.
"""
def register(object): def register(object):
"""Register an IPersistent with the current transaction. """Register an IPersistent with the current transaction.
This method must be called when the object transitions to This method must be called when the object transitions to
the changed state. the changed state.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
""" """
def mtime(object): def mtime(object):
......
...@@ -18,104 +18,7 @@ $Id$ ...@@ -18,104 +18,7 @@ $Id$
import zope.interface import zope.interface
class IResourceManager(zope.interface.Interface): class IDataManager(zope.interface.Interface):
"""Objects that manage resources transactionally.
These objects may manage data for other objects, or they may manage
non-object storages, such as relational databases.
IDataManagerOriginal is the interface currently provided by ZODB
database connections, but the intent is to move to the newer
IDataManager.
"""
# Two-phase commit protocol. These methods are called by the
# ITransaction object associated with the transaction being
# committed.
def tpc_begin(transaction):
"""Begin two-phase commit, to save data changes.
An implementation should do as much work as possible without
making changes permanent. Changes should be made permanent
when tpc_finish is called (or aborted if tpc_abort is called).
The work can be divided between tpc_begin() and tpc_vote(), and
the intent is that tpc_vote() be as fast as possible (to minimize
the period of uncertainty).
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_vote(transaction):
"""Verify that a resource manager can commit the transaction.
This is the last chance for a resource manager to vote 'no'. A
resource manager votes 'no' by raising an exception.
transaction is the ITransaction instance associated with the
transaction being committed.
"""
def tpc_finish(transaction):
"""Indicate confirmation that the transaction is done.
transaction is the ITransaction instance associated with the
transaction being committed.
This should never fail. If this raises an exception, the
database is not expected to maintain consistency; it's a
serious error.
"""
def tpc_abort(transaction):
"""Abort a transaction.
transaction is the ITransaction instance associated with the
transaction being committed.
All changes made by the current transaction are aborted. Note
that this includes all changes stored in any savepoints that may
be associated with the current transaction.
tpc_abort() can be called at any time, either in or out of the
two-phase commit.
This should never fail.
"""
# The savepoint/rollback API.
def savepoint(transaction):
"""Save partial transaction changes.
There are two purposes:
1) To allow discarding partial changes without discarding all
dhanges.
2) To checkpoint changes to disk that would otherwise live in
memory for the duration of the transaction.
Returns an object implementing ISavePoint2 that can be used
to discard changes made since the savepoint was captured.
An implementation that doesn't support savepoints should implement
this method by returning a savepoint object that raises an
exception when its rollback method is called. The savepoint method
shouldn't raise an error. This way, transactions that create
savepoints can proceed as long as an attempt is never made to roll
back a savepoint.
"""
def discard(transaction):
"""Discard changes within the transaction since the last savepoint.
That means changes made since the last savepoint if one exists, or
since the start of the transaction.
"""
class IDataManagerOriginal(zope.interface.Interface):
"""Objects that manage transactional storage. """Objects that manage transactional storage.
These objects may manage data for other objects, or they may manage These objects may manage data for other objects, or they may manage
...@@ -155,7 +58,7 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -155,7 +58,7 @@ class IDataManagerOriginal(zope.interface.Interface):
has been called; this is only used when the transaction is has been called; this is only used when the transaction is
being committed. being committed.
This call also implied the beginning of 2-phase commit. This call also implies the beginning of 2-phase commit.
""" """
# Two-phase commit protocol. These methods are called by the # Two-phase commit protocol. These methods are called by the
...@@ -180,10 +83,12 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -180,10 +83,12 @@ class IDataManagerOriginal(zope.interface.Interface):
""" """
def tpc_abort(transaction): def tpc_abort(transaction):
"""Abort a transaction. """Abort a transaction.
This is called by a transaction manager to end a two-phase commit on
the data manager.
This is always called after a tpc_begin call. This is always called after a tpc_begin call.
transaction is the ITransaction instance associated with the transaction is the ITransaction instance associated with the
...@@ -202,6 +107,11 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -202,6 +107,11 @@ class IDataManagerOriginal(zope.interface.Interface):
database is not expected to maintain consistency; it's a database is not expected to maintain consistency; it's a
serious error. serious error.
It's important that the storage calls the passed function
while it still has its lock. We don't want another thread
to be able to read any updated data until we've had a chance
to send an invalidation message to all of the other
connections!
""" """
def tpc_vote(transaction): def tpc_vote(transaction):
...@@ -214,125 +124,46 @@ class IDataManagerOriginal(zope.interface.Interface): ...@@ -214,125 +124,46 @@ class IDataManagerOriginal(zope.interface.Interface):
transaction being committed. transaction being committed.
""" """
def commit(object, transaction): def commit(transaction):
"""CCCommit changes to an object """Commit modifications to registered objects.
Save the object as part of the data to be made persistent if Save the object as part of the data to be made persistent if
the transaction commits. the transaction commits.
"""
def abort(object, transaction):
"""Abort changes to an object
Only changes made since the last transaction or
sub-transaction boundary are discarded.
This method may be called either:
o Outside of two-phase commit, or
o In the first phase of two-phase commit
"""
def sortKey():
"""
Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering
for resource managers.
"""
class IDataManager(zope.interface.Interface):
"""Data management interface for storing objects transactionally.
ZODB database connections currently provides the older
IDataManagerOriginal interface, but the intent is to move to this newer
IDataManager interface.
Our hope is that this interface will evolve and become the standard
interface. There are some issues to be resolved first, like:
- Probably want separate abort methods for use in and out of
two-phase commit.
- The savepoint api may need some more thought.
"""
def prepare(transaction):
"""Perform the first phase of a 2-phase commit
The data manager prepares for commit any changes to be made
persistent. A normal return from this method indicated that
the data manager is ready to commit the changes.
The data manager must raise an exception if it is not prepared
to commit the transaction after executing prepare().
The transaction must match that used for preceeding This includes conflict detection and handling. If no conflicts or
savepoints, if any. errors occur it saves the objects in the storage.
""" """
# This is equivalent to zodb3's tpc_begin, commit, and
# tpc_vote combined.
def abort(transaction): def abort(transaction):
"""Abort changes made by transaction """Abort a transaction and forget all changes.
This may be called before two-phase commit or in the second
phase of two-phase commit.
The transaction must match that used for preceeding
savepoints, if any.
"""
# This is equivalent to *both* zodb3's abort and tpc_abort
# calls. This should probably be split into 2 methods.
def commit(transaction):
"""Finish two-phase commit
The prepare method must be called, with the same transaction,
before calling commit.
"""
# This is equivalent to zodb3's tpc_finish
def savepoint(transaction):
"""Do tentative commit of changes to this point.
Should return an object implementing IRollback that can be used
to rollback to the savepoint.
Note that (unlike zodb3) this doesn't use a 2-phase commit
protocol. If this call fails, or if a rollback call on the
result fails, the (containing) transaction should be
aborted. Aborting the containing transaction is *not* the
responsibility of the data manager, however.
An implementation that doesn't support savepoints should Abort must be called outside of a two-phase commit.
implement this method by returning a rollback implementation
that always raises an error when it's rollback method is
called. The savepoing method shouldn't raise an error. This
way, transactions that create savepoints can proceed as long
as an attempt is never made to roll back a savepoint.
Abort is called by the transaction manager to abort transactions
that are not yet in a two-phase commit.
""" """
def sortKey(): def sortKey():
""" """Return a key to use for ordering registered DataManagers
Return a key to use for ordering registered DataManagers
ZODB uses a global sort order to prevent deadlock when it commits ZODB uses a global sort order to prevent deadlock when it commits
transactions involving multiple resource managers. The resource transactions involving multiple resource managers. The resource
manager must define a sortKey() method that provides a global ordering manager must define a sortKey() method that provides a global ordering
for resource managers. for resource managers.
""" """
# XXX: Alternate version:
#"""Return a consistent sort key for this connection.
#
#This allows ordering multiple connections that use the same storage in
#a consistent manner. This is unique for the lifetime of a connection,
#which is good enough to avoid ZEO deadlocks.
#"""
def beforeCompletion(transaction):
"""Hook that is called by the transaction before completing a commit"""
def afterCompletion(transaction):
"""Hook that is called by the transaction after completing a commit"""
class ITransaction(zope.interface.Interface): class ITransaction(zope.interface.Interface):
"""Object representing a running transaction. """Object representing a running transaction.
...@@ -414,34 +245,6 @@ class ITransaction(zope.interface.Interface): ...@@ -414,34 +245,6 @@ class ITransaction(zope.interface.Interface):
# Unsure: is this allowed to cause an exception here, during # Unsure: is this allowed to cause an exception here, during
# the two-phase commit, or can it toss data silently? # the two-phase commit, or can it toss data silently?
class ISavePoint(zope.interface.Interface):
"""ISavePoint objects represent partial transaction changes.
Sequences of savepoint objects are associated with transactions,
and with IResourceManagers.
"""
def rollback():
"""Discard changes made after this savepoint.
This includes discarding (call the discard method on) all
subsequent savepoints.
"""
def discard():
"""Discard changes saved by this savepoint.
That means changes made since the immediately preceding
savepoint if one exists, or since the start of the transaction,
until this savepoint.
Once a savepoint has been discarded, it's an error to attempt
to rollback or discard it again.
"""
next_savepoint = zope.interface.Attribute(
"""The next savepoint (later in time), or None if self is the
most recent savepoint.""")
class IRollback(zope.interface.Interface): class IRollback(zope.interface.Interface):
...@@ -457,3 +260,4 @@ class IRollback(zope.interface.Interface): ...@@ -457,3 +260,4 @@ class IRollback(zope.interface.Interface):
- The transaction has ended. - The transaction has ended.
""" """
# Packaging information for zpkg.
header proxy.h
<extension _zope_proxy_proxy>
source _zope_proxy_proxy.c
depends-on proxy.h
</extension>
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""More convenience functions for dealing with proxies.
$Id$
"""
from zope.interface import moduleProvides
from zope.proxy.interfaces import IProxyIntrospection
from types import ClassType
from zope.proxy._zope_proxy_proxy import *
from zope.proxy._zope_proxy_proxy import _CAPI
moduleProvides(IProxyIntrospection)
__all__ = tuple(IProxyIntrospection)
def ProxyIterator(p):
yield p
while isProxy(p):
p = getProxiedObject(p)
yield p
/*############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################*/
/*
* This file is also used as a really extensive macro in
* ../app/container/_zope_app_container_contained.c. If you need to
* change this file, you need to "svn copy" it to ../app/container/.
*
* This approach is taken to allow the sources for the two packages
* to be compilable when the relative locations of these aren't
* related in the same way as they are in a checkout.
*
* This will be revisited in the future, but works for now.
*/
#include "Python.h"
#include "modsupport.h"
#define PROXY_MODULE
#include "proxy.h"
static PyTypeObject ProxyType;
#define Proxy_Check(wrapper) (PyObject_TypeCheck((wrapper), &ProxyType))
static PyObject *
empty_tuple = NULL;
/*
* Slot methods.
*/
static PyObject *
wrap_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
PyObject *result = NULL;
PyObject *object;
if (PyArg_UnpackTuple(args, "__new__", 1, 1, &object)) {
if (kwds != NULL && PyDict_Size(kwds) != 0) {
PyErr_SetString(PyExc_TypeError,
"proxy.__new__ does not accept keyword args");
return NULL;
}
result = PyType_GenericNew(type, args, kwds);
if (result != NULL) {
ProxyObject *wrapper = (ProxyObject *) result;
Py_INCREF(object);
wrapper->proxy_object = object;
}
}
return result;
}
static int
wrap_init(PyObject *self, PyObject *args, PyObject *kwds)
{
int result = -1;
PyObject *object;
if (PyArg_UnpackTuple(args, "__init__", 1, 1, &object)) {
ProxyObject *wrapper = (ProxyObject *)self;
if (kwds != NULL && PyDict_Size(kwds) != 0) {
PyErr_SetString(PyExc_TypeError,
"proxy.__init__ does not accept keyword args");
return -1;
}
/* If the object in this proxy is not the one we
* received in args, replace it with the new one.
*/
if (wrapper->proxy_object != object) {
PyObject *temp = wrapper->proxy_object;
Py_INCREF(object);
wrapper->proxy_object = object;
Py_DECREF(temp);
}
result = 0;
}
return result;
}
static int
wrap_traverse(PyObject *self, visitproc visit, void *arg)
{
PyObject *ob = Proxy_GET_OBJECT(self);
if (ob != NULL)
return visit(ob, arg);
else
return 0;
}
static int
wrap_clear(PyObject *self)
{
ProxyObject *proxy = (ProxyObject *)self;
PyObject *temp = proxy->proxy_object;
if (temp != NULL) {
proxy->proxy_object = NULL;
Py_DECREF(temp);
}
return 0;
}
static PyObject *
wrap_richcompare(PyObject* self, PyObject* other, int op)
{
if (Proxy_Check(self)) {
self = Proxy_GET_OBJECT(self);
}
else {
other = Proxy_GET_OBJECT(other);
}
return PyObject_RichCompare(self, other, op);
}
static PyObject *
wrap_iter(PyObject *self)
{
return PyObject_GetIter(Proxy_GET_OBJECT(self));
}
static PyObject *
wrap_iternext(PyObject *self)
{
return PyIter_Next(Proxy_GET_OBJECT(self));
}
static void
wrap_dealloc(PyObject *self)
{
(void) wrap_clear(self);
self->ob_type->tp_free(self);
}
/* A variant of _PyType_Lookup that doesn't look in ProxyType.
*
* If argument search_wrappertype is nonzero, we can look in WrapperType.
*/
PyObject *
WrapperType_Lookup(PyTypeObject *type, PyObject *name)
{
int i, n;
PyObject *mro, *res, *base, *dict;
/* Look in tp_dict of types in MRO */
mro = type->tp_mro;
/* If mro is NULL, the type is either not yet initialized
by PyType_Ready(), or already cleared by type_clear().
Either way the safest thing to do is to return NULL. */
if (mro == NULL)
return NULL;
assert(PyTuple_Check(mro));
n = PyTuple_GET_SIZE(mro)
- 1; /* We don't want to look at the last item, which is object. */
for (i = 0; i < n; i++) {
base = PyTuple_GET_ITEM(mro, i);
if (((PyTypeObject *)base) != &ProxyType) {
if (PyClass_Check(base))
dict = ((PyClassObject *)base)->cl_dict;
else {
assert(PyType_Check(base));
dict = ((PyTypeObject *)base)->tp_dict;
}
assert(dict && PyDict_Check(dict));
res = PyDict_GetItem(dict, name);
if (res != NULL)
return res;
}
}
return NULL;
}
static PyObject *
wrap_getattro(PyObject *self, PyObject *name)
{
PyObject *wrapped;
PyObject *descriptor;
PyObject *res = NULL;
char *name_as_string;
int maybe_special_name;
#ifdef Py_USING_UNICODE
/* The Unicode to string conversion is done here because the
existing tp_getattro slots expect a string object as name
and we wouldn't want to break those. */
if (PyUnicode_Check(name)) {
name = PyUnicode_AsEncodedString(name, NULL, NULL);
if (name == NULL)
return NULL;
}
else
#endif
if (!PyString_Check(name)){
PyErr_SetString(PyExc_TypeError, "attribute name must be string");
return NULL;
}
else
Py_INCREF(name);
name_as_string = PyString_AS_STRING(name);
wrapped = Proxy_GET_OBJECT(self);
if (wrapped == NULL) {
PyErr_Format(PyExc_RuntimeError,
"object is NULL; requested to get attribute '%s'",
name_as_string);
goto finally;
}
maybe_special_name = name_as_string[0] == '_' && name_as_string[1] == '_';
if (!(maybe_special_name && strcmp(name_as_string, "__class__") == 0)) {
descriptor = WrapperType_Lookup(self->ob_type, name);
if (descriptor != NULL) {
if (PyType_HasFeature(descriptor->ob_type, Py_TPFLAGS_HAVE_CLASS)
&& descriptor->ob_type->tp_descr_get != NULL) {
res = descriptor->ob_type->tp_descr_get(
descriptor,
self,
(PyObject *)self->ob_type);
} else {
Py_INCREF(descriptor);
res = descriptor;
}
goto finally;
}
}
res = PyObject_GetAttr(wrapped, name);
finally:
Py_DECREF(name);
return res;
}
static int
wrap_setattro(PyObject *self, PyObject *name, PyObject *value)
{
PyObject *wrapped;
PyObject *descriptor;
int res = -1;
#ifdef Py_USING_UNICODE
/* The Unicode to string conversion is done here because the
existing tp_setattro slots expect a string object as name
and we wouldn't want to break those. */
if (PyUnicode_Check(name)) {
name = PyUnicode_AsEncodedString(name, NULL, NULL);
if (name == NULL)
return -1;
}
else
#endif
if (!PyString_Check(name)){
PyErr_SetString(PyExc_TypeError, "attribute name must be string");
return -1;
}
else
Py_INCREF(name);
descriptor = WrapperType_Lookup(self->ob_type, name);
if (descriptor != NULL) {
if (PyType_HasFeature(descriptor->ob_type, Py_TPFLAGS_HAVE_CLASS) &&
descriptor->ob_type->tp_descr_set != NULL) {
res = descriptor->ob_type->tp_descr_set(descriptor, self, value);
} else {
PyErr_Format(PyExc_TypeError,
"Tried to set attribute '%s' on wrapper, but it is not"
" a data descriptor", PyString_AS_STRING(name));
}
goto finally;
}
wrapped = Proxy_GET_OBJECT(self);
if (wrapped == NULL) {
PyErr_Format(PyExc_RuntimeError,
"object is NULL; requested to set attribute '%s'",
PyString_AS_STRING(name));
goto finally;
}
res = PyObject_SetAttr(wrapped, name, value);
finally:
Py_DECREF(name);
return res;
}
static int
wrap_print(PyObject *wrapper, FILE *fp, int flags)
{
return PyObject_Print(Proxy_GET_OBJECT(wrapper), fp, flags);
}
static PyObject *
wrap_str(PyObject *wrapper) {
return PyObject_Str(Proxy_GET_OBJECT(wrapper));
}
static PyObject *
wrap_repr(PyObject *wrapper)
{
return PyObject_Repr(Proxy_GET_OBJECT(wrapper));
}
static int
wrap_compare(PyObject *wrapper, PyObject *v)
{
return PyObject_Compare(Proxy_GET_OBJECT(wrapper), v);
}
static long
wrap_hash(PyObject *self)
{
return PyObject_Hash(Proxy_GET_OBJECT(self));
}
static PyObject *
wrap_call(PyObject *self, PyObject *args, PyObject *kw)
{
if (kw)
return PyEval_CallObjectWithKeywords(Proxy_GET_OBJECT(self),
args, kw);
else
return PyObject_CallObject(Proxy_GET_OBJECT(self), args);
}
/*
* Number methods
*/
/*
* Number methods.
*/
static PyObject *
call_int(PyObject *self)
{
PyNumberMethods *nb = self->ob_type->tp_as_number;
if (nb == NULL || nb->nb_int == NULL) {
PyErr_SetString(PyExc_TypeError,
"object can't be converted to int");
return NULL;
}
return nb->nb_int(self);
}
static PyObject *
call_long(PyObject *self)
{
PyNumberMethods *nb = self->ob_type->tp_as_number;
if (nb == NULL || nb->nb_long == NULL) {
PyErr_SetString(PyExc_TypeError,
"object can't be converted to long");
return NULL;
}
return nb->nb_long(self);
}
static PyObject *
call_float(PyObject *self)
{
PyNumberMethods *nb = self->ob_type->tp_as_number;
if (nb == NULL || nb->nb_float== NULL) {
PyErr_SetString(PyExc_TypeError,
"object can't be converted to float");
return NULL;
}
return nb->nb_float(self);
}
static PyObject *
call_oct(PyObject *self)
{
PyNumberMethods *nb = self->ob_type->tp_as_number;
if (nb == NULL || nb->nb_oct== NULL) {
PyErr_SetString(PyExc_TypeError,
"object can't be converted to oct");
return NULL;
}
return nb->nb_oct(self);
}
static PyObject *
call_hex(PyObject *self)
{
PyNumberMethods *nb = self->ob_type->tp_as_number;
if (nb == NULL || nb->nb_hex == NULL) {
PyErr_SetString(PyExc_TypeError,
"object can't be converted to hex");
return NULL;
}
return nb->nb_hex(self);
}
static PyObject *
call_ipow(PyObject *self, PyObject *other)
{
/* PyNumber_InPlacePower has three args. How silly. :-) */
return PyNumber_InPlacePower(self, other, Py_None);
}
typedef PyObject *(*function1)(PyObject *);
static PyObject *
check1(ProxyObject *self, char *opname, function1 operation)
{
PyObject *result = NULL;
result = operation(Proxy_GET_OBJECT(self));
#if 0
if (result != NULL)
/* XXX create proxy for result? */
;
#endif
return result;
}
static PyObject *
check2(PyObject *self, PyObject *other,
char *opname, char *ropname, binaryfunc operation)
{
PyObject *result = NULL;
PyObject *object;
if (Proxy_Check(self)) {
object = Proxy_GET_OBJECT(self);
result = operation(object, other);
}
else if (Proxy_Check(other)) {
object = Proxy_GET_OBJECT(other);
result = operation(self, object);
}
else {
Py_INCREF(Py_NotImplemented);
return Py_NotImplemented;
}
#if 0
if (result != NULL)
/* XXX create proxy for result? */
;
#endif
return result;
}
static PyObject *
check2i(ProxyObject *self, PyObject *other,
char *opname, binaryfunc operation)
{
PyObject *result = NULL;
PyObject *object = Proxy_GET_OBJECT(self);
result = operation(object, other);
if (result == object) {
/* If the operation was really carried out inplace,
don't create a new proxy, but use the old one. */
Py_INCREF(self);
Py_DECREF(object);
result = (PyObject *)self;
}
#if 0
else if (result != NULL)
/* XXX create proxy for result? */
;
#endif
return result;
}
#define UNOP(NAME, CALL) \
static PyObject *wrap_##NAME(PyObject *self) \
{ return check1((ProxyObject *)self, "__"#NAME"__", CALL); }
#define BINOP(NAME, CALL) \
static PyObject *wrap_##NAME(PyObject *self, PyObject *other) \
{ return check2(self, other, "__"#NAME"__", "__r"#NAME"__", CALL); }
#define INPLACE(NAME, CALL) \
static PyObject *wrap_i##NAME(PyObject *self, PyObject *other) \
{ return check2i((ProxyObject *)self, other, "__i"#NAME"__", CALL); }
BINOP(add, PyNumber_Add)
BINOP(sub, PyNumber_Subtract)
BINOP(mul, PyNumber_Multiply)
BINOP(div, PyNumber_Divide)
BINOP(mod, PyNumber_Remainder)
BINOP(divmod, PyNumber_Divmod)
static PyObject *
wrap_pow(PyObject *self, PyObject *other, PyObject *modulus)
{
PyObject *result = NULL;
PyObject *object;
if (Proxy_Check(self)) {
object = Proxy_GET_OBJECT(self);
result = PyNumber_Power(object, other, modulus);
}
else if (Proxy_Check(other)) {
object = Proxy_GET_OBJECT(other);
result = PyNumber_Power(self, object, modulus);
}
else if (modulus != NULL && Proxy_Check(modulus)) {
object = Proxy_GET_OBJECT(modulus);
result = PyNumber_Power(self, other, modulus);
}
else {
Py_INCREF(Py_NotImplemented);
return Py_NotImplemented;
}
return result;
}
BINOP(lshift, PyNumber_Lshift)
BINOP(rshift, PyNumber_Rshift)
BINOP(and, PyNumber_And)
BINOP(xor, PyNumber_Xor)
BINOP(or, PyNumber_Or)
static int
wrap_coerce(PyObject **p_self, PyObject **p_other)
{
PyObject *self = *p_self;
PyObject *other = *p_other;
PyObject *object;
PyObject *left;
PyObject *right;
int r;
assert(Proxy_Check(self));
object = Proxy_GET_OBJECT(self);
left = object;
right = other;
r = PyNumber_CoerceEx(&left, &right);
if (r != 0)
return r;
/* Now left and right have been INCREF'ed. Any new value that
comes out is proxied; any unchanged value is left unchanged. */
if (left == object) {
/* Keep the old proxy */
Py_INCREF(self);
Py_DECREF(left);
left = self;
}
#if 0
else {
/* XXX create proxy for left? */
}
if (right != other) {
/* XXX create proxy for right? */
}
#endif
*p_self = left;
*p_other = right;
return 0;
}
UNOP(neg, PyNumber_Negative)
UNOP(pos, PyNumber_Positive)
UNOP(abs, PyNumber_Absolute)
UNOP(invert, PyNumber_Invert)
UNOP(int, call_int)
UNOP(long, call_long)
UNOP(float, call_float)
UNOP(oct, call_oct)
UNOP(hex, call_hex)
INPLACE(add, PyNumber_InPlaceAdd)
INPLACE(sub, PyNumber_InPlaceSubtract)
INPLACE(mul, PyNumber_InPlaceMultiply)
INPLACE(div, PyNumber_InPlaceDivide)
INPLACE(mod, PyNumber_InPlaceRemainder)
INPLACE(pow, call_ipow)
INPLACE(lshift, PyNumber_InPlaceLshift)
INPLACE(rshift, PyNumber_InPlaceRshift)
INPLACE(and, PyNumber_InPlaceAnd)
INPLACE(xor, PyNumber_InPlaceXor)
INPLACE(or, PyNumber_InPlaceOr)
BINOP(floordiv, PyNumber_FloorDivide)
BINOP(truediv, PyNumber_TrueDivide)
INPLACE(floordiv, PyNumber_InPlaceFloorDivide)
INPLACE(truediv, PyNumber_InPlaceTrueDivide)
static int
wrap_nonzero(PyObject *self)
{
return PyObject_IsTrue(Proxy_GET_OBJECT(self));
}
/*
* Sequence methods
*/
static int
wrap_length(PyObject *self)
{
return PyObject_Length(Proxy_GET_OBJECT(self));
}
static PyObject *
wrap_slice(PyObject *self, int start, int end)
{
return PySequence_GetSlice(Proxy_GET_OBJECT(self), start, end);
}
static int
wrap_ass_slice(PyObject *self, int i, int j, PyObject *value)
{
return PySequence_SetSlice(Proxy_GET_OBJECT(self), i, j, value);
}
static int
wrap_contains(PyObject *self, PyObject *value)
{
return PySequence_Contains(Proxy_GET_OBJECT(self), value);
}
/*
* Mapping methods
*/
static PyObject *
wrap_getitem(PyObject *wrapper, PyObject *v) {
return PyObject_GetItem(Proxy_GET_OBJECT(wrapper), v);
}
static int
wrap_setitem(PyObject *self, PyObject *key, PyObject *value)
{
if (value == NULL)
return PyObject_DelItem(Proxy_GET_OBJECT(self), key);
else
return PyObject_SetItem(Proxy_GET_OBJECT(self), key, value);
}
/*
* Normal methods
*/
static char
reduce__doc__[] =
"__reduce__()\n"
"Raise an exception; this prevents proxies from being picklable by\n"
"default, even if the underlying object is picklable.";
static PyObject *
wrap_reduce(PyObject *self)
{
PyObject *pickle_error = NULL;
PyObject *pickle = PyImport_ImportModule("pickle");
if (pickle == NULL)
PyErr_Clear();
else {
pickle_error = PyObject_GetAttrString(pickle, "PicklingError");
if (pickle_error == NULL)
PyErr_Clear();
}
if (pickle_error == NULL) {
pickle_error = PyExc_RuntimeError;
Py_INCREF(pickle_error);
}
PyErr_SetString(pickle_error,
"proxy instances cannot be pickled");
Py_DECREF(pickle_error);
return NULL;
}
static PyNumberMethods
wrap_as_number = {
wrap_add, /* nb_add */
wrap_sub, /* nb_subtract */
wrap_mul, /* nb_multiply */
wrap_div, /* nb_divide */
wrap_mod, /* nb_remainder */
wrap_divmod, /* nb_divmod */
wrap_pow, /* nb_power */
wrap_neg, /* nb_negative */
wrap_pos, /* nb_positive */
wrap_abs, /* nb_absolute */
wrap_nonzero, /* nb_nonzero */
wrap_invert, /* nb_invert */
wrap_lshift, /* nb_lshift */
wrap_rshift, /* nb_rshift */
wrap_and, /* nb_and */
wrap_xor, /* nb_xor */
wrap_or, /* nb_or */
wrap_coerce, /* nb_coerce */
wrap_int, /* nb_int */
wrap_long, /* nb_long */
wrap_float, /* nb_float */
wrap_oct, /* nb_oct */
wrap_hex, /* nb_hex */
/* Added in release 2.0 */
/* These require the Py_TPFLAGS_HAVE_INPLACEOPS flag */
wrap_iadd, /* nb_inplace_add */
wrap_isub, /* nb_inplace_subtract */
wrap_imul, /* nb_inplace_multiply */
wrap_idiv, /* nb_inplace_divide */
wrap_imod, /* nb_inplace_remainder */
(ternaryfunc)wrap_ipow, /* nb_inplace_power */
wrap_ilshift, /* nb_inplace_lshift */
wrap_irshift, /* nb_inplace_rshift */
wrap_iand, /* nb_inplace_and */
wrap_ixor, /* nb_inplace_xor */
wrap_ior, /* nb_inplace_or */
/* Added in release 2.2 */
/* These require the Py_TPFLAGS_HAVE_CLASS flag */
wrap_floordiv, /* nb_floor_divide */
wrap_truediv, /* nb_true_divide */
wrap_ifloordiv, /* nb_inplace_floor_divide */
wrap_itruediv, /* nb_inplace_true_divide */
};
static PySequenceMethods
wrap_as_sequence = {
wrap_length, /* sq_length */
0, /* sq_concat */
0, /* sq_repeat */
0, /* sq_item */
wrap_slice, /* sq_slice */
0, /* sq_ass_item */
wrap_ass_slice, /* sq_ass_slice */
wrap_contains, /* sq_contains */
};
static PyMappingMethods
wrap_as_mapping = {
wrap_length, /* mp_length */
wrap_getitem, /* mp_subscript */
wrap_setitem, /* mp_ass_subscript */
};
static PyMethodDef
wrap_methods[] = {
{"__reduce__", (PyCFunction)wrap_reduce, METH_NOARGS, reduce__doc__},
{NULL, NULL},
};
/*
* Note that the numeric methods are not supported. This is primarily
* because of the way coercion-less operations are performed with
* new-style numbers; since we can't tell which side of the operation
* is 'self', we can't ensure we'd unwrap the right thing to perform
* the actual operation. We also can't afford to just unwrap both
* sides the way weakrefs do, since we don't know what semantics will
* be associated with the wrapper itself.
*/
statichere PyTypeObject
ProxyType = {
PyObject_HEAD_INIT(NULL) /* PyObject_HEAD_INIT(&PyType_Type) */
0,
"zope.proxy.ProxyBase",
sizeof(ProxyObject),
0,
wrap_dealloc, /* tp_dealloc */
wrap_print, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
wrap_compare, /* tp_compare */
wrap_repr, /* tp_repr */
&wrap_as_number, /* tp_as_number */
&wrap_as_sequence, /* tp_as_sequence */
&wrap_as_mapping, /* tp_as_mapping */
wrap_hash, /* tp_hash */
wrap_call, /* tp_call */
wrap_str, /* tp_str */
wrap_getattro, /* tp_getattro */
wrap_setattro, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC
| Py_TPFLAGS_CHECKTYPES | Py_TPFLAGS_BASETYPE, /* tp_flags */
0, /* tp_doc */
wrap_traverse, /* tp_traverse */
wrap_clear, /* tp_clear */
wrap_richcompare, /* tp_richcompare */
0, /* tp_weaklistoffset */
wrap_iter, /* tp_iter */
wrap_iternext, /* tp_iternext */
wrap_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
wrap_init, /* tp_init */
0, /* tp_alloc */
wrap_new, /* tp_new */
0, /*_PyObject_GC_Del,*/ /* tp_free */
};
static PyObject *
create_proxy(PyObject *object)
{
PyObject *result = NULL;
PyObject *args;
args = PyTuple_New(1);
if (args != NULL) {
Py_INCREF(object);
PyTuple_SET_ITEM(args, 0, object);
result = PyObject_CallObject((PyObject *)&ProxyType, args);
Py_DECREF(args);
}
return result;
}
static int
api_check(PyObject *obj)
{
return obj ? Proxy_Check(obj) : 0;
}
static PyObject *
api_create(PyObject *object)
{
if (object == NULL) {
PyErr_SetString(PyExc_ValueError,
"cannot create proxy around NULL");
return NULL;
}
return create_proxy(object);
}
static PyObject *
api_getobject(PyObject *proxy)
{
if (proxy == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"cannot pass NULL to ProxyAPI.getobject()");
return NULL;
}
if (Proxy_Check(proxy))
return Proxy_GET_OBJECT(proxy);
else {
PyErr_Format(PyExc_TypeError, "expected proxy object, got %s",
proxy->ob_type->tp_name);
return NULL;
}
}
static ProxyInterface
wrapper_capi = {
&ProxyType,
api_check,
api_create,
api_getobject,
};
static PyObject *api_object = NULL;
static char
getobject__doc__[] =
"getProxiedObject(proxy) --> object\n"
"\n"
"Get the underlying object for proxy, or the object itself, if it is\n"
"not a proxy.";
static PyObject *
wrapper_getobject(PyObject *unused, PyObject *obj)
{
if (Proxy_Check(obj))
obj = Proxy_GET_OBJECT(obj);
if (obj == NULL)
obj = Py_None;
Py_INCREF(obj);
return obj;
}
static char
isProxy__doc__[] =
"Check whether the given object is a proxy\n"
"\n"
"If proxytype is not None, checkes whether the object is\n"
"proxied by the given proxytype.\n"
;
static PyObject *
wrapper_isProxy(PyObject *unused, PyObject *args)
{
PyObject *obj, *result;
PyTypeObject *proxytype=&ProxyType;
if (! PyArg_ParseTuple(args, "O|O!:isProxy",
&obj, &PyType_Type, &proxytype)
)
return NULL;
while (obj && Proxy_Check(obj))
{
if (PyObject_TypeCheck(obj, proxytype))
{
result = Py_True;
Py_INCREF(result);
return result;
}
obj = Proxy_GET_OBJECT(obj);
}
result = Py_False;
Py_INCREF(result);
return result;
}
static char
removeAllProxies__doc__[] =
"removeAllProxies(proxy) --> object\n"
"\n"
"Get the proxied object with no proxies\n"
"\n"
"If obj is not a proxied object, return obj.\n"
"\n"
"The returned object has no proxies.\n"
;
static PyObject *
wrapper_removeAllProxies(PyObject *unused, PyObject *obj)
{
while (obj && Proxy_Check(obj))
obj = Proxy_GET_OBJECT(obj);
if (obj == NULL)
obj = Py_None;
Py_INCREF(obj);
return obj;
}
static char
sameProxiedObjects__doc__[] =
"Check whether two objects are the same or proxies of the same object";
static PyObject *
wrapper_sameProxiedObjects(PyObject *unused, PyObject *args)
{
PyObject *ob1, *ob2;
if (! PyArg_ParseTuple(args, "OO:sameProxiedObjects", &ob1, &ob2))
return NULL;
while (ob1 && Proxy_Check(ob1))
ob1 = Proxy_GET_OBJECT(ob1);
while (ob2 && Proxy_Check(ob2))
ob2 = Proxy_GET_OBJECT(ob2);
if (ob1 == ob2)
ob1 = Py_True;
else
ob1 = Py_False;
Py_INCREF(ob1);
return ob1;
}
static char
queryProxy__doc__[] =
"Look for a proxy of the given type around the object\n"
"\n"
"If no such proxy can be found, return the default.\n"
;
static PyObject *
wrapper_queryProxy(PyObject *unused, PyObject *args)
{
PyObject *obj, *result=Py_None;
PyTypeObject *proxytype=&ProxyType;
if (! PyArg_ParseTuple(args, "O|O!O:queryProxy",
&obj, &PyType_Type, &proxytype, &result)
)
return NULL;
while (obj && Proxy_Check(obj))
{
if (PyObject_TypeCheck(obj, proxytype))
{
Py_INCREF(obj);
return obj;
}
obj = Proxy_GET_OBJECT(obj);
}
Py_INCREF(result);
return result;
}
static char
queryInnerProxy__doc__[] =
"Look for the inner-most proxy of the given type around the object\n"
"\n"
"If no such proxy can be found, return the default.\n"
"\n"
"If there is such a proxy, return the inner-most one.\n"
;
static PyObject *
wrapper_queryInnerProxy(PyObject *unused, PyObject *args)
{
PyObject *obj, *result=Py_None;
PyTypeObject *proxytype=&ProxyType;
if (! PyArg_ParseTuple(args, "O|O!O:queryInnerProxy",
&obj, &PyType_Type, &proxytype, &result)
)
return NULL;
while (obj && Proxy_Check(obj))
{
if (PyObject_TypeCheck(obj, proxytype))
result = obj;
obj = Proxy_GET_OBJECT(obj);
}
Py_INCREF(result);
return result;
}
static char
module___doc__[] =
"Association between an object, a context object, and a dictionary.\n\
\n\
The context object and dictionary give additional context information\n\
associated with a reference to the basic object. The wrapper objects\n\
act as proxies for the original object.";
static PyMethodDef
module_functions[] = {
{"getProxiedObject", wrapper_getobject, METH_O, getobject__doc__},
{"isProxy", wrapper_isProxy, METH_VARARGS, isProxy__doc__},
{"sameProxiedObjects", wrapper_sameProxiedObjects, METH_VARARGS,
sameProxiedObjects__doc__},
{"queryProxy", wrapper_queryProxy, METH_VARARGS, queryProxy__doc__},
{"queryInnerProxy", wrapper_queryInnerProxy, METH_VARARGS,
queryInnerProxy__doc__},
{"removeAllProxies", wrapper_removeAllProxies, METH_O,
removeAllProxies__doc__},
{NULL}
};
void
init_zope_proxy_proxy(void)
{
PyObject *m = Py_InitModule3("_zope_proxy_proxy",
module_functions, module___doc__);
if (m == NULL)
return;
if (empty_tuple == NULL)
empty_tuple = PyTuple_New(0);
ProxyType.tp_free = _PyObject_GC_Del;
if (PyType_Ready(&ProxyType) < 0)
return;
Py_INCREF(&ProxyType);
PyModule_AddObject(m, "ProxyBase", (PyObject *)&ProxyType);
if (api_object == NULL) {
api_object = PyCObject_FromVoidPtr(&wrapper_capi, NULL);
if (api_object == NULL)
return;
}
Py_INCREF(api_object);
PyModule_AddObject(m, "_CAPI", api_object);
}
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Proxy-related interfaces.
$Id$
"""
from zope.interface import Interface
class IProxyIntrospection(Interface):
"""Provides methods for indentifying proxies and extracting proxied objects
"""
def isProxy(obj, proxytype=None):
"""Check whether the given object is a proxy
If proxytype is not None, checkes whether the object is
proxied by the given proxytype.
"""
def sameProxiedObjects(ob1, ob2):
"""Check whether ob1 and ob2 are the same or proxies of the same object
"""
def getProxiedObject(obj):
"""Get the proxied Object
If the object isn't proxied, then just return the object.
"""
def removeAllProxies(obj):
"""Get the proxied object with no proxies
If obj is not a proxied object, return obj.
The returned object has no proxies.
"""
def queryProxy(obj, proxytype, default=None):
"""Look for a proxy of the given type around the object
If no such proxy can be found, return the default.
"""
def queryInnerProxy(obj, proxytype, default=None):
"""Look for the inner-most proxy of the given type around the object
If no such proxy can be found, return the default.
If there is such a proxy, return the inner-most one.
"""
#ifndef _proxy_H_
#define _proxy_H_ 1
typedef struct {
PyObject_HEAD
PyObject *proxy_object;
} ProxyObject;
#define Proxy_GET_OBJECT(ob) (((ProxyObject *)(ob))->proxy_object)
typedef struct {
PyTypeObject *proxytype;
int (*check)(PyObject *obj);
PyObject *(*create)(PyObject *obj);
PyObject *(*getobject)(PyObject *proxy);
} ProxyInterface;
#ifndef PROXY_MODULE
/* These are only defined in the public interface, and are not
* available within the module implementation. There we use the
* classic Python/C API only.
*/
static ProxyInterface *_proxy_api = NULL;
static int
Proxy_Import(void)
{
if (_proxy_api == NULL) {
PyObject *m = PyImport_ImportModule("zope.proxy");
if (m != NULL) {
PyObject *tmp = PyObject_GetAttrString(m, "_CAPI");
if (tmp != NULL) {
if (PyCObject_Check(tmp))
_proxy_api = (ProxyInterface *)
PyCObject_AsVoidPtr(tmp);
Py_DECREF(tmp);
}
}
}
return (_proxy_api == NULL) ? -1 : 0;
}
#define ProxyType (*_proxy_api->proxytype)
#define Proxy_Check(obj) (_proxy_api->check((obj)))
#define Proxy_CheckExact(obj) ((obj)->ob_type == ProxyType)
#define Proxy_New(obj) (_proxy_api->create((obj)))
#define Proxy_GetObject(proxy) (_proxy_api->getobject((proxy)))
#endif /* PROXY_MODULE */
#endif /* _proxy_H_ */
#
# This file is necessary to make this directory a package.
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test base proxy class.
$Id$
"""
import pickle
import sys
import unittest
from zope.testing.doctestunit import DocTestSuite
from zope.proxy import ProxyBase
class Thing:
"""This class is expected to be a classic class."""
class Comparable(object):
def __init__(self, value):
self.value = value
def __eq__(self, other):
if hasattr(other, "value"):
other = other.value
return self.value == other
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
if hasattr(other, "value"):
other = other.value
return self.value < other
def __ge__(self, other):
return not self.__lt__(other)
def __le__(self, other):
if hasattr(other, "value"):
other = other.value
return self.value <= other
def __gt__(self, other):
return not self.__le__(other)
def __repr__(self):
return "<Comparable: %r>" % self.value
class ProxyTestCase(unittest.TestCase):
proxy_class = ProxyBase
def setUp(self):
self.x = Thing()
self.p = self.new_proxy(self.x)
def new_proxy(self, o):
return self.proxy_class(o)
def test_constructor(self):
o = object()
self.assertRaises(TypeError, self.proxy_class, o, o)
self.assertRaises(TypeError, self.proxy_class, o, key='value')
self.assertRaises(TypeError, self.proxy_class, key='value')
def test_subclass_constructor(self):
class MyProxy(self.proxy_class):
def __new__(cls, *args, **kwds):
return super(MyProxy, cls).__new__(cls, *args, **kwds)
def __init__(self, *args, **kwds):
super(MyProxy, self).__init__(*args, **kwds)
o1 = object()
o2 = object()
o = MyProxy((o1, o2))
self.assertEquals(o1, o[0])
self.assertEquals(o2, o[1])
self.assertRaises(TypeError, MyProxy, o1, o2)
self.assertRaises(TypeError, MyProxy, o1, key='value')
self.assertRaises(TypeError, MyProxy, key='value')
# Check that are passed to __init__() overrides what's passed
# to __new__().
class MyProxy2(self.proxy_class):
def __new__(cls, *args, **kwds):
return super(MyProxy2, cls).__new__(cls, 'value')
p = MyProxy2('splat!')
self.assertEquals(list(p), list('splat!'))
class MyProxy3(MyProxy2):
def __init__(self, arg):
if list(self) != list('value'):
raise AssertionError("list(self) != list('value')")
super(MyProxy3, self).__init__('another')
p = MyProxy3('notused')
self.assertEquals(list(p), list('another'))
def test_proxy_attributes(self):
o = Thing()
o.foo = 1
w = self.new_proxy(o)
self.assert_(w.foo == 1)
def test___class__(self):
o = object()
w = self.new_proxy(o)
self.assert_(w.__class__ is o.__class__)
def test_pickle_prevention(self):
w = self.new_proxy(Thing())
self.assertRaises(pickle.PicklingError,
pickle.dumps, w)
def test_proxy_equality(self):
w = self.new_proxy('foo')
self.assertEquals(w, 'foo')
o1 = Comparable(1)
o2 = Comparable(1.0)
o3 = Comparable("splat!")
w1 = self.new_proxy(o1)
w2 = self.new_proxy(o2)
w3 = self.new_proxy(o3)
self.assertEquals(o1, w1)
self.assertEquals(o1, w2)
self.assertEquals(o2, w1)
self.assertEquals(w1, o2)
self.assertEquals(w2, o1)
self.assertNotEquals(o3, w1)
self.assertNotEquals(w1, o3)
self.assertNotEquals(w3, o1)
self.assertNotEquals(o1, w3)
def test_proxy_ordering_lt(self):
o1 = Comparable(1)
o2 = Comparable(2.0)
w1 = self.new_proxy(o1)
w2 = self.new_proxy(o2)
self.assert_(w1 < w2)
self.assert_(w1 <= w2)
self.assert_(o1 < w2)
self.assert_(o1 <= w2)
self.assert_(w1 < o2)
self.assert_(w2 <= o2)
def test_proxy_callable(self):
w = self.new_proxy({}.get)
self.assert_(callable(w))
def test_proxy_item_protocol(self):
w = self.new_proxy({})
self.assertRaises(KeyError, lambda: w[1])
w[1] = 'a'
self.assertEquals(w[1], 'a')
del w[1]
self.assertRaises(KeyError, lambda: w[1])
def del_w_1():
del w[1]
self.assertRaises(KeyError, del_w_1)
def test_wrapped_iterable(self):
a = [1, 2, 3]
b = []
for x in self.new_proxy(a):
b.append(x)
self.assertEquals(a, b)
def test_iteration_over_proxy(self):
# Wrap an iterator before starting iteration.
# PyObject_GetIter() will still be called on the proxy.
a = [1, 2, 3]
b = []
for x in self.new_proxy(iter(a)):
b.append(x)
self.assertEquals(a, b)
t = tuple(self.new_proxy(iter(a)))
self.assertEquals(t, (1, 2, 3))
def test_iteration_using_proxy(self):
# Wrap an iterator within the iteration protocol, expecting it
# still to work. PyObject_GetIter() will not be called on the
# proxy, so the tp_iter slot won't unwrap it.
class Iterable(object):
def __init__(self, test, data):
self.test = test
self.data = data
def __iter__(self):
return self.test.new_proxy(iter(self.data))
a = [1, 2, 3]
b = []
for x in Iterable(self, a):
b.append(x)
self.assertEquals(a, b)
def test_bool_wrapped_None(self):
w = self.new_proxy(None)
self.assertEquals(not w, 1)
# Numeric ops.
unops = [
"-x", "+x", "abs(x)", "~x",
"int(x)", "long(x)", "float(x)",
]
def test_unops(self):
P = self.new_proxy
for expr in self.unops:
x = 1
y = eval(expr)
x = P(1)
z = eval(expr)
self.assertEqual(z, y,
"x=%r; expr=%r" % (x, expr))
def test_odd_unops(self):
# unops that don't return a proxy
P = self.new_proxy
for func in hex, oct, lambda x: not x:
self.assertEqual(func(P(100)), func(100))
binops = [
"x+y", "x-y", "x*y", "x/y", "divmod(x, y)", "x**y", "x//y",
"x<<y", "x>>y", "x&y", "x|y", "x^y",
]
def test_binops(self):
P = self.new_proxy
for expr in self.binops:
first = 1
for x in [1, P(1)]:
for y in [2, P(2)]:
if first:
z = eval(expr)
first = 0
else:
self.assertEqual(eval(expr), z,
"x=%r; y=%r; expr=%r" % (x, y, expr))
def test_inplace(self):
# TODO: should test all inplace operators...
P = self.new_proxy
pa = P(1)
pa += 2
self.assertEqual(pa, 3)
a = [1, 2, 3]
pa = qa = P(a)
pa += [4, 5, 6]
self.failUnless(pa is qa)
self.assertEqual(a, [1, 2, 3, 4, 5, 6])
pa = P(2)
pa **= 2
self.assertEqual(pa, 4)
def test_coerce(self):
P = self.new_proxy
# Before 2.3, coerce() of two proxies returns them unchanged
fixed_coerce = sys.version_info >= (2, 3, 0)
x = P(1)
y = P(2)
a, b = coerce(x, y)
self.failUnless(a is x and b is y)
x = P(1)
y = P(2.1)
a, b = coerce(x, y)
self.failUnless(a == 1.0)
self.failUnless(b is y)
if fixed_coerce:
self.failUnless(a.__class__ is float, a.__class__)
x = P(1.1)
y = P(2)
a, b = coerce(x, y)
self.failUnless(a is x)
self.failUnless(b == 2.0)
if fixed_coerce:
self.failUnless(b.__class__ is float, b.__class__)
x = P(1)
y = 2
a, b = coerce(x, y)
self.failUnless(a is x)
self.failUnless(b is y)
x = P(1)
y = 2.1
a, b = coerce(x, y)
self.failUnless(a.__class__ is float, a.__class__)
self.failUnless(b is y)
x = P(1.1)
y = 2
a, b = coerce(x, y)
self.failUnless(a is x)
self.failUnless(b.__class__ is float, b.__class__)
x = 1
y = P(2)
a, b = coerce(x, y)
self.failUnless(a is x)
self.failUnless(b is y)
x = 1.1
y = P(2)
a, b = coerce(x, y)
self.failUnless(a is x)
self.failUnless(b.__class__ is float, b.__class__)
x = 1
y = P(2.1)
a, b = coerce(x, y)
self.failUnless(a.__class__ is float, a.__class__)
self.failUnless(b is y)
def test_isProxy():
"""
>>> from zope.proxy import ProxyBase, isProxy
>>> class P1(ProxyBase):
... pass
>>> class P2(ProxyBase):
... pass
>>> class C(object):
... pass
>>> c = C()
>>> int(isProxy(c))
0
>>> p = P1(c)
>>> int(isProxy(p))
1
>>> int(isProxy(p, P1))
1
>>> int(isProxy(p, P2))
0
>>> p = P2(p)
>>> int(isProxy(p, P1))
1
>>> int(isProxy(p, P2))
1
"""
def test_getProxiedObject():
"""
>>> from zope.proxy import ProxyBase, getProxiedObject
>>> class C(object):
... pass
>>> c = C()
>>> int(getProxiedObject(c) is c)
1
>>> p = ProxyBase(c)
>>> int(getProxiedObject(p) is c)
1
>>> p2 = ProxyBase(p)
>>> int(getProxiedObject(p2) is p)
1
"""
def test_ProxyIterator():
"""
>>> from zope.proxy import ProxyBase, ProxyIterator
>>> class C(object):
... pass
>>> c = C()
>>> p1 = ProxyBase(c)
>>> class P(ProxyBase):
... pass
>>> p2 = P(p1)
>>> p3 = ProxyBase(p2)
>>> list(ProxyIterator(p3)) == [p3, p2, p1, c]
1
"""
def test_removeAllProxies():
"""
>>> from zope.proxy import ProxyBase, removeAllProxies
>>> class C(object):
... pass
>>> c = C()
>>> int(removeAllProxies(c) is c)
1
>>> p = ProxyBase(c)
>>> int(removeAllProxies(p) is c)
1
>>> p2 = ProxyBase(p)
>>> int(removeAllProxies(p2) is c)
1
"""
def test_queryProxy():
"""
>>> from zope.proxy import ProxyBase, queryProxy
>>> class P1(ProxyBase):
... pass
>>> class P2(ProxyBase):
... pass
>>> class C(object):
... pass
>>> c = C()
>>> queryProxy(c, P1)
>>> queryProxy(c, P1, 42)
42
>>> p1 = P1(c)
>>> int(queryProxy(p1, P1) is p1)
1
>>> queryProxy(c, P2)
>>> queryProxy(c, P2, 42)
42
>>> p2 = P2(p1)
>>> int(queryProxy(p2, P1) is p1)
1
>>> int(queryProxy(p2, P2) is p2)
1
>>> int(queryProxy(p2, ProxyBase) is p2)
1
"""
def test_queryInnerProxy():
"""
>>> from zope.proxy import ProxyBase, queryProxy, queryInnerProxy
>>> class P1(ProxyBase):
... pass
>>> class P2(ProxyBase):
... pass
>>> class C(object):
... pass
>>> c = C()
>>> queryInnerProxy(c, P1)
>>> queryInnerProxy(c, P1, 42)
42
>>> p1 = P1(c)
>>> int(queryProxy(p1, P1) is p1)
1
>>> queryInnerProxy(c, P2)
>>> queryInnerProxy(c, P2, 42)
42
>>> p2 = P2(p1)
>>> int(queryInnerProxy(p2, P1) is p1)
1
>>> int(queryInnerProxy(p2, P2) is p2)
1
>>> int(queryInnerProxy(p2, ProxyBase) is p1)
1
>>> p3 = P1(p2)
>>> int(queryProxy(p3, P1) is p3)
1
>>> int(queryInnerProxy(p3, P1) is p1)
1
>>> int(queryInnerProxy(p3, P2) is p2)
1
"""
def test_sameProxiedObjects():
"""
>>> from zope.proxy import ProxyBase, sameProxiedObjects
>>> class C(object):
... pass
>>> c1 = C()
>>> c2 = C()
>>> int(sameProxiedObjects(c1, c1))
1
>>> int(sameProxiedObjects(ProxyBase(c1), c1))
1
>>> int(sameProxiedObjects(ProxyBase(c1), ProxyBase(c1)))
1
>>> int(sameProxiedObjects(ProxyBase(ProxyBase(c1)), c1))
1
>>> int(sameProxiedObjects(c1, ProxyBase(c1)))
1
>>> int(sameProxiedObjects(c1, ProxyBase(ProxyBase(c1))))
1
>>> int(sameProxiedObjects(c1, c2))
0
>>> int(sameProxiedObjects(ProxyBase(c1), c2))
0
>>> int(sameProxiedObjects(ProxyBase(c1), ProxyBase(c2)))
0
>>> int(sameProxiedObjects(ProxyBase(ProxyBase(c1)), c2))
0
>>> int(sameProxiedObjects(c1, ProxyBase(c2)))
0
>>> int(sameProxiedObjects(c1, ProxyBase(ProxyBase(c2))))
0
"""
def test_subclassing_proxies():
"""You can subclass ProxyBase
If you subclass a proxy, instances of the subclass have access to
data defined in the class, including descriptors.
Your subclass instances don't get instance dictionaries, but they
can have slots.
>>> class MyProxy(ProxyBase):
... __slots__ = 'x', 'y'
...
... def f(self):
... return self.x
>>> l = [1, 2, 3]
>>> p = MyProxy(l)
I can use attributes defined by the class, including slots:
>>> p.x = 'x'
>>> p.x
'x'
>>> p.f()
'x'
I can also use attributes of the proxied object:
>>> p
[1, 2, 3]
>>> p.pop()
3
>>> p
[1, 2]
"""
def test_suite():
suite = unittest.makeSuite(ProxyTestCase)
suite.addTest(DocTestSuite())
return suite
if __name__ == "__main__":
runner = unittest.TextTestRunner(sys.stdout)
result = runner.run(test_suite())
newerrs = len(result.errors) + len(result.failures)
sys.exit(newerrs and 1 or 0)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment