Commit 1c1688c0 authored by Joshua Woelfel's avatar Joshua Woelfel

Added ZODB functionality to support zconn_at

parent d8a8c0fc
...@@ -29,6 +29,7 @@ from weakref import WeakSet ...@@ -29,6 +29,7 @@ from weakref import WeakSet
import gc import gc
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
import time import time
from ZODB.utils import z64
import pkg_resources import pkg_resources
...@@ -146,7 +147,6 @@ def zconn_at(zconn): # -> tid ...@@ -146,7 +147,6 @@ def zconn_at(zconn): # -> tid
assert isinstance(zconn, ZODB.Connection.Connection) assert isinstance(zconn, ZODB.Connection.Connection)
if zconn.opened is None: # zconn must be in "opened" state if zconn.opened is None: # zconn must be in "opened" state
raise POSException.ConnectionStateError("database connection is closed") raise POSException.ConnectionStateError("database connection is closed")
# ZODB5 uses MVCC uniformly # ZODB5 uses MVCC uniformly
# #
# zconn.db._storage always provides IMVCCStorage - either raw storage provides it, # zconn.db._storage always provides IMVCCStorage - either raw storage provides it,
...@@ -186,34 +186,12 @@ def zconn_at(zconn): # -> tid ...@@ -186,34 +186,12 @@ def zconn_at(zconn): # -> tid
# later db state exists, returns tid of db state previous to later # later db state exists, returns tid of db state previous to later
# state # state
else: else:
if __isHistoricConnection(zconn): zconn._inv_lock.acquire()
return before2at(zconn.before) try:
else: viewed_state = zconn.viewed_db_state
last_transaction = zconn.db().lastTransaction() finally:
if __viewingLatestState(zconn): zconn._inv_lock.release()
return last_transaction return viewed_state
else:
return __tidOfDbStateBefore(zconn._storage, before2at(zconn._txn_time))
# looks through OOBTree storing all tids of storage to find the tid previous
# to before
def __tidOfDbStateBefore(storage, before):
# lookup should be O(log n) but unsure of the overhead space
# implication of this operation, perhaps there is a less
# expensive way of getting this value
return storage._transactions.maxKey(before)
# returns true/false if zconn is viewing latest db state at time of
# function call
def __viewingLatestState(zconn):
return True if zconn._txn_time is None else False
# returns true/false if zconn is a historical connection
def __isHistoricConnection(zconn):
if zmajor >= 5:
return isinstance(zconn._storage, ZODB.mvccadapter.HistoricalStorageAdapter)
else:
return True if not(zconn.before is None) else False
# before2at converts tid that specifies database state as "before" into tid that # before2at converts tid that specifies database state as "before" into tid that
# specifies database state as "at". # specifies database state as "at".
...@@ -319,6 +297,160 @@ else: ...@@ -319,6 +297,160 @@ else:
pass # raises in onResyncCallback pass # raises in onResyncCallback
if zmajor == 4:
orig_connection_constructor = ZODB.Connection.Connection.__init__
def connection_constructor(self, db, cache_size=400, before=None, cache_size_bytes=0):
orig_connection_constructor(self, db, cache_size, before, cache_size_bytes)
# viewed state of the db
self.viewed_db_state = self._db.latest_state
ZODB.Connection.Connection.__init__ = connection_constructor
orig_db_constructor = ZODB.DB.__init__
def db_constructor(self, storage,
pool_size=7,
pool_timeout=1<<31,
cache_size=400,
cache_size_bytes=0,
historical_pool_size=3,
historical_cache_size=1000,
historical_cache_size_bytes=0,
historical_timeout=300,
database_name='unnamed',
databases=None,
xrefs=True,
large_record_size=1<<24,
**storage_args):
orig_db_constructor(self, storage,
pool_size,
pool_timeout,
cache_size,
cache_size_bytes,
historical_pool_size,
historical_cache_size,
historical_cache_size_bytes,
historical_timeout,
database_name,
databases,
xrefs,
large_record_size,
**storage_args)
# initialize viewed state of db for new connections being opened
last_tid = self.lastTransaction()
self.history(z64)
self._a()
try:
if self.latest_state is None:
self.latest_state = last_tid
finally:
self._r()
ZODB.DB.__init__= db_constructor
def db_connectionMap(self, f, tid=None):
"""Call f(c) for all connections c in all pools, live and historical.
"""
self._a()
try:
# updating latest_state for zconn_at
if not (tid is None):
self.latest_state = tid
self.pool.map(f)
self.historical_pool.map(f)
finally:
self._r()
ZODB.DB._connectionMap = db_connectionMap
def db_invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
change to the object. The connection commiting the change should be
passed in to prevent useless (but harmless) messages to the
connection.
"""
# Storages, esp. ZEO tests, need the version argument still. :-/
assert version==''
# Notify connections.
def inval(c):
if c is not connection:
c.invalidate(tid, oids)
else:
# update resynced connection's view of db
c._inv_lock.acquire()
try:
c.viewed_db_state = tid
finally:
c._inv_lock.release()
self._connectionMap(inval, tid)
ZODB.DB.invalidate = db_invalidate
def connection_flush_invalidations(self):
if self._mvcc_storage:
# Poll the storage for invalidations.
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._cache.invalidate(list(self._cache.cache_data.keys()))
elif invalidated:
self._cache.invalidate(invalidated)
self._db._a()
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
# update resynced connection's view of the db
self.viewed_db_state = self._db.latest_state
finally:
self._inv_lock.release()
self._db._r()
self._cache.invalidate(invalidated)
# Now is a good time to collect some garbage.
self._cache.incrgc()
ZODB.Connection.Connection._flush_invalidations = connection_flush_invalidations
# zstor_2zurl converts a ZODB storage to URL to access it. # zstor_2zurl converts a ZODB storage to URL to access it.
def zstor_2zurl(zstor): def zstor_2zurl(zstor):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment