Commit 591184df authored by Julien Muchembled's avatar Julien Muchembled

wip (ZODB5)

parent 4dfdf05a
...@@ -159,11 +159,8 @@ class Storage(BaseStorage.BaseStorage, ...@@ -159,11 +159,8 @@ class Storage(BaseStorage.BaseStorage,
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
raise POSException.POSKeyError(oid) raise POSException.POSKeyError(oid)
def sync(self, force=True): def sync(self):
# XXX: sync() is part of IMVCCStorage and we don't want to be called return self.app.sync()
# from afterCompletion() so it may not be a good place to ping the
# master here. See also monkey-patch in __init__.py
self.app.lastTransaction()
def copyTransactionsFrom(self, source, verbose=False): def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """ """ Zope compliant API """
...@@ -186,7 +183,7 @@ class Storage(BaseStorage.BaseStorage, ...@@ -186,7 +183,7 @@ class Storage(BaseStorage.BaseStorage,
def lastTransaction(self): def lastTransaction(self):
# Used in ZODB unit tests # Used in ZODB unit tests
return self.app.lastTransaction() return self.app.last_tid
def _clear_temp(self): def _clear_temp(self):
raise NotImplementedError raise NotImplementedError
......
...@@ -12,15 +12,11 @@ ...@@ -12,15 +12,11 @@
# #
############################################################################## ##############################################################################
import app # set up signal handers early enough to do it in the main thread def patch():
if 1:
from hashlib import md5 from hashlib import md5
from ZODB.Connection import Connection from ZODB.Connection import Connection
def _check(f, *args): H = lambda f: md5(f.func_code.co_code).hexdigest()
h = md5(f.func_code.co_code).hexdigest()
assert h in args, h
# Allow serial to be returned as late as tpc_finish # Allow serial to be returned as late as tpc_finish
# #
...@@ -28,9 +24,7 @@ if 1: ...@@ -28,9 +24,7 @@ if 1:
# removing the requirement to serialise second commit phase (tpc_vote # removing the requirement to serialise second commit phase (tpc_vote
# to tpc_finish/tpc_abort). # to tpc_finish/tpc_abort).
_check(Connection.tpc_finish, h = H(Connection.tpc_finish)
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
def tpc_finish(self, transaction): def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done.""" """Indicate confirmation that the transaction is done."""
...@@ -61,18 +55,28 @@ if 1: ...@@ -61,18 +55,28 @@ if 1:
# </patch> # </patch>
self._tpc_cleanup() self._tpc_cleanup()
Connection.tpc_finish = tpc_finish global OLD_ZODB
OLD_ZODB = h in (
'ab9b1b8d82c40e5fffa84f7bc4ea3a8b', # Python 2.7
)
if OLD_ZODB:
Connection.tpc_finish = tpc_finish
else:
# Merged upstream.
assert hasattr(Connection, '_warn_about_returned_serial')
# IStorage implementations usually need to provide a "network barrier", # sync() is used to provide a "network barrier", which is required for
# at least for NEO & ZEO, to make sure we have an up-to-date view of # NEO & ZEO to make sure our view of the storage includes all changes done
# the storage. It's unclear whether sync() is a good place to do this # so far by other clients. But a round-trip to the server introduces
# because a round-trip to the server introduces latency and we prefer # latency so it must not be done when it's not useful. Note also that a
# it's not done when it's not useful. # successful commit (which ends with a response from the master) already
# For example, we know we are up-to-date after a successful commit, # acts as a "network barrier".
# so this should not be done in afterCompletion(), and anyway, we don't # BBB: What this monkey-patch does has been merged in ZODB5.
# know any legitimate use of DB access outside a transaction. if not hasattr(Connection, '_flush_invalidations'):
return
_check(Connection.afterCompletion, assert H(Connection.afterCompletion) in (
'cd3a080b80fd957190ff3bb867149448', # Python 2.7 'cd3a080b80fd957190ff3bb867149448', # Python 2.7
) )
...@@ -81,3 +85,7 @@ if 1: ...@@ -81,3 +85,7 @@ if 1:
# PATCH: do not call sync() # PATCH: do not call sync()
self._flush_invalidations() self._flush_invalidations()
Connection.afterCompletion = afterCompletion Connection.afterCompletion = afterCompletion
patch()
import app # set up signal handers early enough to do it in the main thread
...@@ -25,6 +25,7 @@ from functools import partial ...@@ -25,6 +25,7 @@ from functools import partial
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
from ZODB.POSException import ReadConflictError from ZODB.POSException import ReadConflictError
from ZODB.ConflictResolution import ResolvedSerial from ZODB.ConflictResolution import ResolvedSerial
from . import OLD_ZODB
from persistent.TimeStamp import TimeStamp from persistent.TimeStamp import TimeStamp
from neo.lib import logging from neo.lib import logging
...@@ -108,7 +109,6 @@ class Application(ThreadedApplication): ...@@ -108,7 +109,6 @@ class Application(ThreadedApplication):
self._loading_oid = None self._loading_oid = None
self.new_oid_list = () self.new_oid_list = ()
self.last_oid = '\0' * 8 self.last_oid = '\0' * 8
self.last_tid = None
self.storage_event_handler = storage.StorageEventHandler(self) self.storage_event_handler = storage.StorageEventHandler(self)
self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self) self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
self.storage_handler = storage.StorageAnswersHandler(self) self.storage_handler = storage.StorageAnswersHandler(self)
...@@ -136,8 +136,12 @@ class Application(ThreadedApplication): ...@@ -136,8 +136,12 @@ class Application(ThreadedApplication):
self.compress = compress self.compress = compress
def __getattr__(self, attr): def __getattr__(self, attr):
if attr == 'pt': if attr in ('last_tid', 'pt'):
self._getMasterConnection() if self._connecting_to_master_node.locked():
if attr == 'last_tid':
return
else:
self._getMasterConnection()
return self.__getattribute__(attr) return self.__getattribute__(attr)
def log(self): def log(self):
...@@ -609,7 +613,7 @@ class Application(ThreadedApplication): ...@@ -609,7 +613,7 @@ class Application(ThreadedApplication):
logging.error('tpc_store failed') logging.error('tpc_store failed')
raise NEOStorageError('tpc_store failed') raise NEOStorageError('tpc_store failed')
elif oid in resolved_oid_set: elif oid in resolved_oid_set:
append((oid, ResolvedSerial)) append((oid, ResolvedSerial) if OLD_ZODB else oid)
return result return result
def tpc_vote(self, transaction, tryToResolveConflict): def tpc_vote(self, transaction, tryToResolveConflict):
...@@ -962,9 +966,8 @@ class Application(ThreadedApplication): ...@@ -962,9 +966,8 @@ class Application(ThreadedApplication):
from .iterator import iterator from .iterator import iterator
def lastTransaction(self): def sync(self):
self._askPrimary(Packets.AskLastTransaction()) self._askPrimary(Packets.Ping())
return self.last_tid
def pack(self, t): def pack(self, t):
tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw() tid = TimeStamp(*time.gmtime(t)[:5] + (t % 60, )).raw()
......
...@@ -99,7 +99,10 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -99,7 +99,10 @@ class PrimaryNotificationsHandler(MTEventHandler):
app = self.app app = self.app
ltid = packet.decode()[0] ltid = packet.decode()[0]
if app.last_tid != ltid: if app.last_tid != ltid:
if app.master_conn is None: # Either we're connecting or we already know the last tid
# via invalidations.
assert app.master_conn is None, app.master_conn
if 1:
app._cache_lock_acquire() app._cache_lock_acquire()
try: try:
if app.last_tid < ltid: if app.last_tid < ltid:
......
...@@ -63,7 +63,7 @@ def iterator(app, start=None, stop=None): ...@@ -63,7 +63,7 @@ def iterator(app, start=None, stop=None):
"""NEO transaction iterator""" """NEO transaction iterator"""
if start is None: if start is None:
start = ZERO_TID start = ZERO_TID
stop = min(stop or MAX_TID, app.lastTransaction()) stop = min(stop or MAX_TID, app.last_tid)
while 1: while 1:
max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH) max_tid, chunk = app.transactionLog(start, stop, CHUNK_LENGTH)
if not chunk: if not chunk:
......
...@@ -226,8 +226,9 @@ class MTEventHandler(EventHandler): ...@@ -226,8 +226,9 @@ class MTEventHandler(EventHandler):
def packetReceived(self, conn, packet, kw={}): def packetReceived(self, conn, packet, kw={}):
"""Redirect all received packet to dispatcher thread.""" """Redirect all received packet to dispatcher thread."""
if packet.isResponse() and type(packet) is not Packets.Pong: if packet.isResponse():
if not self.dispatcher.dispatch(conn, packet.getId(), packet, kw): if not (self.dispatcher.dispatch(conn, packet.getId(), packet, kw)
or type(packet) is Packets.Pong):
raise ProtocolError('Unexpected response packet from %r: %r' raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet)) % (conn, packet))
else: else:
......
...@@ -38,6 +38,7 @@ class Dispatcher(object): ...@@ -38,6 +38,7 @@ class Dispatcher(object):
def _getMasterConnection(self): def _getMasterConnection(self):
if self.master_conn is None: if self.master_conn is None:
self.last_tid = None
self.uuid = 1 + (UUID_NAMESPACES[NodeTypes.CLIENT] << 24) self.uuid = 1 + (UUID_NAMESPACES[NodeTypes.CLIENT] << 24)
self.num_partitions = 10 self.num_partitions = 10
self.num_replicas = 1 self.num_replicas = 1
......
...@@ -335,7 +335,7 @@ class ClientTests(NEOFunctionalTest): ...@@ -335,7 +335,7 @@ class ClientTests(NEOFunctionalTest):
t3.description = 'desc' t3.description = 'desc'
st3.tpc_begin(t3) st3.tpc_begin(t3)
# retreive the last revision # retreive the last revision
data, serial = st3.load(oid, '') data, serial = st3.load(oid)
# try to store again, should not be delayed # try to store again, should not be delayed
st3.store(oid, serial, data, '', t3) st3.store(oid, serial, data, '', t3)
# the vote should not timeout # the vote should not timeout
......
...@@ -630,7 +630,8 @@ class Test(NEOThreadedTest): ...@@ -630,7 +630,8 @@ class Test(NEOThreadedTest):
t.begin() t.begin()
s0.stop() # force client to ask s1 s0.stop() # force client to ask s1
self.assertEqual(sorted(c.root()), [1]) self.assertEqual(sorted(c.root()), [1])
t0, t1 = c._storage.iterator() self.tic()
t0, t1 = c.db().storage.iterator()
finally: finally:
cluster.stop() cluster.stop()
...@@ -885,7 +886,7 @@ class Test(NEOThreadedTest): ...@@ -885,7 +886,7 @@ class Test(NEOThreadedTest):
# Now test cache invalidation during a load from a storage # Now test cache invalidation during a load from a storage
ll = LockLock() ll = LockLock()
def _loadFromStorage(orig, *args): def break_after(orig, *args):
try: try:
return orig(*args) return orig(*args)
finally: finally:
...@@ -893,7 +894,7 @@ class Test(NEOThreadedTest): ...@@ -893,7 +894,7 @@ class Test(NEOThreadedTest):
x2._p_deactivate() x2._p_deactivate()
# Remove last version of x from cache # Remove last version of x from cache
cache._remove(cache._oid_dict[x2._p_oid].pop()) cache._remove(cache._oid_dict[x2._p_oid].pop())
with ll, Patch(cluster.client, _loadFromStorage=_loadFromStorage): with ll, Patch(cluster.client, _loadFromStorage=break_after):
t = self.newThread(x2._p_activate) t = self.newThread(x2._p_activate)
ll() ll()
# At this point, x could not be found the cache and the result # At this point, x could not be found the cache and the result
...@@ -911,16 +912,18 @@ class Test(NEOThreadedTest): ...@@ -911,16 +912,18 @@ class Test(NEOThreadedTest):
self.assertEqual(x2.value, 1) self.assertEqual(x2.value, 1)
self.assertEqual(x1.value, 0) self.assertEqual(x1.value, 0)
# l1 is acquired and l2 is released def invalidations(conn):
try:
return conn._storage._invalidations
except AttributeError: # BBB: ZODB < 5
return conn._invalidated
# Change x again from 0 to 1, while the checking connection c1 # Change x again from 0 to 1, while the checking connection c1
# is suspended at the beginning of the transaction t1, # is suspended at the beginning of the transaction t1,
# between Storage.sync() and flush of invalidations. # between Storage.sync() and flush of invalidations.
def _flush_invalidations(orig):
ll()
orig()
x1._p_deactivate() x1._p_deactivate()
t1.abort() t1.abort()
with ll, Patch(c1, _flush_invalidations=_flush_invalidations): with ll, Patch(c1._storage, sync=break_after):
t = self.newThread(t1.begin) t = self.newThread(t1.begin)
ll() ll()
txn = transaction.Transaction() txn = transaction.Transaction()
...@@ -928,10 +931,14 @@ class Test(NEOThreadedTest): ...@@ -928,10 +931,14 @@ class Test(NEOThreadedTest):
client.store(x2._p_oid, tid, y, '', txn) client.store(x2._p_oid, tid, y, '', txn)
tid = client.tpc_finish(txn, None) tid = client.tpc_finish(txn, None)
client.close() client.close()
self.assertEqual(invalidations(c1), {x1._p_oid})
t.join() t.join()
# A transaction really begins when it acquires the lock to flush # A transaction really begins when it gets the last tid from the
# invalidations. The previous lastTransaction() only does a ping # storage, just before flushing invalidations (on ZODB < 5, it's
# to make sure we have a recent enough view of the DB. # when it acquires the lock to flush invalidations). The previous
# call to sync() only does a ping to make sure we have a recent
# enough view of the DB.
self.assertFalse(invalidations(c1))
self.assertEqual(x1.value, 1) self.assertEqual(x1.value, 1)
finally: finally:
...@@ -970,7 +977,7 @@ class Test(NEOThreadedTest): ...@@ -970,7 +977,7 @@ class Test(NEOThreadedTest):
# transaction before the last one, and clearing the cache before # transaction before the last one, and clearing the cache before
# reloading x. # reloading x.
c1._storage.load(x._p_oid) c1._storage.load(x._p_oid)
t0, t1, t2 = c1._storage.iterator() t0, t1, t2 = c1.db().storage.iterator()
self.assertEqual(map(u64, t0.oid_list), [0]) self.assertEqual(map(u64, t0.oid_list), [0])
self.assertEqual(map(u64, t1.oid_list), [0, 1]) self.assertEqual(map(u64, t1.oid_list), [0, 1])
# Check oid 1 is part of transaction metadata. # Check oid 1 is part of transaction metadata.
......
...@@ -373,14 +373,14 @@ class ReplicationTests(NEOThreadedTest): ...@@ -373,14 +373,14 @@ class ReplicationTests(NEOThreadedTest):
self.tic() self.tic()
s1.stop() s1.stop()
cluster.join((s1,)) cluster.join((s1,))
t0, t1, t2 = c._storage.iterator() t0, t1, t2 = c.db().storage.iterator()
s1.resetNode() s1.resetNode()
s1.start() s1.start()
self.tic() self.tic()
self.assertEqual([], cluster.getOutdatedCells()) self.assertEqual([], cluster.getOutdatedCells())
s0.stop() s0.stop()
cluster.join((s0,)) cluster.join((s0,))
t0, t1, t2 = c._storage.iterator() t0, t1, t2 = c.db().storage.iterator()
finally: finally:
cluster.stop() cluster.stop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment