Commit 3a6a3475 authored by Kirill Smelkov's avatar Kirill Smelkov

Sync with NEO/py v1.12-13-gf2ea4be2 (oldproto branch)

* origin/old-proto:
  qa: skip broken ZODB test
  client: fix race with invalidations when starting a new transaction on ZODB 5
  Code clean-up, comment fixes
  master: fix crash in STARTING_BACKUP when connecting to an upstream secondary master
  mysql: workaround for MDEV-20693
  client: inline Application._loadFromCache
  client: replace global load lock by a per-oid one
  client: unindent code
  client: remove load lock in tpc_finish
  qa: check cache in testExternalInvalidation
  qa: comment testExternalInvalidation2
parents a4bf053e f2ea4be2
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import heapq import heapq
import random import random
import time import time
from collections import defaultdict
try: try:
from ZODB._compat import dumps, loads, _protocol from ZODB._compat import dumps, loads, _protocol
except ImportError: except ImportError:
...@@ -79,7 +79,7 @@ class Application(ThreadedApplication): ...@@ -79,7 +79,7 @@ class Application(ThreadedApplication):
# no self-assigned NID, primary master will supply us one # no self-assigned NID, primary master will supply us one
self._cache = ClientCache() if cache_size is None else \ self._cache = ClientCache() if cache_size is None else \
ClientCache(max_size=cache_size) ClientCache(max_size=cache_size)
self._loading_oid = None self._loading = defaultdict(lambda: (Lock(), []))
self.new_oid_list = () self.new_oid_list = ()
self.last_oid = '\0' * 8 self.last_oid = '\0' * 8
self.storage_event_handler = storage.StorageEventHandler(self) self.storage_event_handler = storage.StorageEventHandler(self)
...@@ -90,19 +90,13 @@ class Application(ThreadedApplication): ...@@ -90,19 +90,13 @@ class Application(ThreadedApplication):
self.notifications_handler = master.PrimaryNotificationsHandler( self) self.notifications_handler = master.PrimaryNotificationsHandler( self)
self._txn_container = TransactionContainer() self._txn_container = TransactionContainer()
# Lock definition : # Lock definition :
# _load_lock is used to make loading and storing atomic
lock = Lock()
self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release
# _oid_lock is used in order to not call multiple oid # _oid_lock is used in order to not call multiple oid
# generation at the same time # generation at the same time
lock = Lock() lock = Lock()
self._oid_lock_acquire = lock.acquire self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release self._oid_lock_release = lock.release
lock = Lock()
# _cache_lock is used for the client cache # _cache_lock is used for the client cache
self._cache_lock_acquire = lock.acquire self._cache_lock = Lock()
self._cache_lock_release = lock.release
# _connecting_to_master_node is used to prevent simultaneous master # _connecting_to_master_node is used to prevent simultaneous master
# node connection attempts # node connection attempts
self._connecting_to_master_node = Lock() self._connecting_to_master_node = Lock()
...@@ -397,21 +391,32 @@ class Application(ThreadedApplication): ...@@ -397,21 +391,32 @@ class Application(ThreadedApplication):
""" """
# TODO: # TODO:
# - rename parameters (here? and in handlers & packet definitions) # - rename parameters (here? and in handlers & packet definitions)
acquired = False
acquire = self._cache_lock_acquire lock = self._cache_lock
release = self._cache_lock_release
# XXX: Consider using a more fine-grained lock.
self._load_lock_acquire()
try: try:
acquire() while 1:
try: with lock:
result = self._loadFromCache(oid, tid, before_tid) if tid:
if result: result = self._cache.load(oid, tid + '*')
return result assert not result or result[1] == tid
self._loading_oid = oid else:
self._loading_invalidated = [] result = self._cache.load(oid, before_tid)
finally: if result:
release() return result
load_lock = self._loading[oid][0]
acquired = load_lock.acquire(0)
# Several concurrent cache misses for the same oid are probably
# for the same tid so we use a per-oid lock to avoid asking the
# same data to the storage node.
if acquired:
# The first thread does load from storage,
# and fills cache with the response.
break
# The other threads wait for the first one to complete and
# loop, possibly resulting in a new cache miss if a different
# tid is actually wanted or if the data was too big.
with load_lock:
pass
# While the cache lock is released, an arbitrary number of # While the cache lock is released, an arbitrary number of
# invalidations may be processed, for this oid or not. And at this # invalidations may be processed, for this oid or not. And at this
# precise moment, if both tid and before_tid are None (which is # precise moment, if both tid and before_tid are None (which is
...@@ -427,20 +432,24 @@ class Application(ThreadedApplication): ...@@ -427,20 +432,24 @@ class Application(ThreadedApplication):
# we got from master. # we got from master.
before_tid = p64(u64(self.last_tid) + 1) before_tid = p64(u64(self.last_tid) + 1)
data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid) data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
acquire() with lock:
try: loading = self._loading.pop(oid, None)
if self._loading_oid: if loading:
assert loading[0] is load_lock
if not next_tid: if not next_tid:
for t in self._loading_invalidated: for t in loading[1]:
if tid < t: if tid < t:
next_tid = t next_tid = t
break break
self._cache.store(oid, data, tid, next_tid) self._cache.store(oid, data, tid, next_tid)
# Else, we just reconnected to the master. # Else, we just reconnected to the master.
finally: load_lock.release()
release() except:
finally: if acquired:
self._load_lock_release() with lock:
self._loading.pop(oid, None)
load_lock.release()
raise
return data, tid, next_tid return data, tid, next_tid
def _loadFromStorage(self, oid, at_tid, before_tid): def _loadFromStorage(self, oid, at_tid, before_tid):
...@@ -459,16 +468,6 @@ class Application(ThreadedApplication): ...@@ -459,16 +468,6 @@ class Application(ThreadedApplication):
Packets.AskObject(oid, at_tid, before_tid), Packets.AskObject(oid, at_tid, before_tid),
askStorage) askStorage)
def _loadFromCache(self, oid, at_tid=None, before_tid=None):
"""
Load from local cache, return None if not found.
"""
if at_tid:
result = self._cache.load(oid, at_tid + '*')
assert not result or result[1] == at_tid
return result
return self._cache.load(oid, before_tid)
def tpc_begin(self, storage, transaction, tid=None, status=' '): def tpc_begin(self, storage, transaction, tid=None, status=' '):
"""Begin a new transaction.""" """Begin a new transaction."""
# First get a transaction, only one is allowed at a time # First get a transaction, only one is allowed at a time
...@@ -670,7 +669,7 @@ class Application(ThreadedApplication): ...@@ -670,7 +669,7 @@ class Application(ThreadedApplication):
txn_context = self._txn_container.pop(transaction) txn_context = self._txn_container.pop(transaction)
if txn_context is None: if txn_context is None:
return return
# We want that the involved nodes abort a transaction after any # We want the involved nodes to abort a transaction after any
# other packet sent by the client for this transaction. IOW, if we # other packet sent by the client for this transaction. IOW, if we
# already have a connection with a storage node, potentially with # already have a connection with a storage node, potentially with
# a pending write, aborting only via the master may lead to a race # a pending write, aborting only via the master may lead to a race
...@@ -699,9 +698,8 @@ class Application(ThreadedApplication): ...@@ -699,9 +698,8 @@ class Application(ThreadedApplication):
txn_context.conn_dict)) txn_context.conn_dict))
except ConnectionClosed: except ConnectionClosed:
pass pass
# We don't need to flush queue, as it won't be reused by future # No need to flush queue, as it will be destroyed on return,
# transactions (deleted on next line & indexed by transaction object # along with txn_context.
# instance).
self.dispatcher.forget_queue(txn_context.queue, flush_queue=False) self.dispatcher.forget_queue(txn_context.queue, flush_queue=False)
def tpc_finish(self, transaction, f=None): def tpc_finish(self, transaction, f=None):
...@@ -724,28 +722,22 @@ class Application(ThreadedApplication): ...@@ -724,28 +722,22 @@ class Application(ThreadedApplication):
txn_container = self._txn_container txn_container = self._txn_container
if not txn_container.get(transaction).voted: if not txn_container.get(transaction).voted:
self.tpc_vote(transaction) self.tpc_vote(transaction)
checked_list = [] txn_context = txn_container.pop(transaction)
self._load_lock_acquire() cache_dict = txn_context.cache_dict
checked_list = [oid for oid, data in cache_dict.iteritems()
if data is CHECKED_SERIAL]
for oid in checked_list:
del cache_dict[oid]
ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list)
try: try:
# Call finish on master tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
txn_context = txn_container.pop(transaction) assert tid
cache_dict = txn_context.cache_dict except ConnectionClosed:
checked_list = [oid for oid, data in cache_dict.iteritems() tid = self._getFinalTID(ttid)
if data is CHECKED_SERIAL] if not tid:
for oid in checked_list: raise
del cache_dict[oid] return tid
ttid = txn_context.ttid
p = Packets.AskFinishTransaction(ttid, cache_dict, checked_list)
try:
tid = self._askPrimary(p, cache_dict=cache_dict, callback=f)
assert tid
except ConnectionClosed:
tid = self._getFinalTID(ttid)
if not tid:
raise
return tid
finally:
self._load_lock_release()
def _getFinalTID(self, ttid): def _getFinalTID(self, ttid):
try: try:
...@@ -991,11 +983,8 @@ class Application(ThreadedApplication): ...@@ -991,11 +983,8 @@ class Application(ThreadedApplication):
# It should not be otherwise required (clients should be free to load # It should not be otherwise required (clients should be free to load
# old data as long as it is available in cache, event if it was pruned # old data as long as it is available in cache, event if it was pruned
# by a pack), so don't bother invalidating on other clients. # by a pack), so don't bother invalidating on other clients.
self._cache_lock_acquire() with self._cache_lock:
try:
self._cache.clear() self._cache.clear()
finally:
self._cache_lock_release()
def getLastTID(self, oid): def getLastTID(self, oid):
return self.load(oid)[1] return self.load(oid)[1]
......
# -*- coding: utf-8 -*-
# #
# Copyright (C) 2006-2019 Nexedi SA # Copyright (C) 2006-2019 Nexedi SA
# #
...@@ -45,8 +46,7 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -45,8 +46,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
# Either we're connecting or we already know the last tid # Either we're connecting or we already know the last tid
# via invalidations. # via invalidations.
assert app.master_conn is None, app.master_conn assert app.master_conn is None, app.master_conn
app._cache_lock_acquire() with app._cache_lock:
try:
if app_last_tid < ltid: if app_last_tid < ltid:
app._cache.clear_current() app._cache.clear_current()
# In the past, we tried not to invalidate the # In the past, we tried not to invalidate the
...@@ -60,9 +60,7 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -60,9 +60,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
app._cache.clear() app._cache.clear()
# Make sure a parallel load won't refill the cache # Make sure a parallel load won't refill the cache
# with garbage. # with garbage.
app._loading_oid = app._loading_invalidated = None app._loading.clear()
finally:
app._cache_lock_release()
db = app.getDB() db = app.getDB()
db is None or db.invalidateCache() db is None or db.invalidateCache()
app.last_tid = ltid app.last_tid = ltid
...@@ -70,21 +68,22 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -70,21 +68,22 @@ class PrimaryNotificationsHandler(MTEventHandler):
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict): def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app app = self.app
app.last_tid = tid
# Update cache
cache = app._cache cache = app._cache
app._cache_lock_acquire() invalidate = cache.invalidate
try: loading_get = app._loading.get
with app._cache_lock:
for oid, data in cache_dict.iteritems(): for oid, data in cache_dict.iteritems():
# Update ex-latest value in cache # Update ex-latest value in cache
cache.invalidate(oid, tid) invalidate(oid, tid)
loading = loading_get(oid)
if loading:
loading[1].append(tid)
if data is not None: if data is not None:
# Store in cache with no next_tid # Store in cache with no next_tid
cache.store(oid, data, tid, None) cache.store(oid, data, tid, None)
if callback is not None: if callback is not None:
callback(tid) callback(tid)
finally: app.last_tid = tid # see comment in invalidateObjects
app._cache_lock_release()
def connectionClosed(self, conn): def connectionClosed(self, conn):
app = self.app app = self.app
...@@ -112,20 +111,24 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -112,20 +111,24 @@ class PrimaryNotificationsHandler(MTEventHandler):
app = self.app app = self.app
if app.ignore_invalidations: if app.ignore_invalidations:
return return
app.last_tid = tid with app._cache_lock:
app._cache_lock_acquire()
try:
invalidate = app._cache.invalidate invalidate = app._cache.invalidate
loading = app._loading_oid loading_get = app._loading.get
for oid in oid_list: for oid in oid_list:
invalidate(oid, tid) invalidate(oid, tid)
if oid == loading: loading = loading_get(oid)
app._loading_invalidated.append(tid) if loading:
loading[1].append(tid)
db = app.getDB() db = app.getDB()
if db is not None: if db is not None:
db.invalidate(tid, oid_list) db.invalidate(tid, oid_list)
finally: # ZODB<5: Update before releasing the lock so that app.load
app._cache_lock_release() # asks the last serial (with respect to already processed
# invalidations by Connection._setstate).
# ZODB≥5: Update after db.invalidate because the MVCC
# adapter starts at the greatest TID between
# IStorage.lastTransaction and processed invalidations.
app.last_tid = tid
def sendPartitionTable(self, conn, ptid, num_replicas, row_list): def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
pt = self.app.pt = object.__new__(PartitionTable) pt = self.app.pt = object.__new__(PartitionTable)
......
...@@ -50,7 +50,7 @@ class Transaction(object): ...@@ -50,7 +50,7 @@ class Transaction(object):
self.conflict_dict = {} # {oid: serial} self.conflict_dict = {} # {oid: serial}
# resolved conflicts # resolved conflicts
self.resolved_dict = {} # {oid: serial} self.resolved_dict = {} # {oid: serial}
# involved storage nodes; connection is None is connection was lost # involved storage nodes; connection is None if connection was lost
self.conn_dict = {} # {node_id: connection} self.conn_dict = {} # {node_id: connection}
def __repr__(self): def __repr__(self):
......
...@@ -197,8 +197,7 @@ elif IF == 'trace-cache': ...@@ -197,8 +197,7 @@ elif IF == 'trace-cache':
@defer @defer
def profile(app): def profile(app):
app._cache_lock_acquire() with app._cache_lock:
try:
cache = app._cache cache = app._cache
if type(cache) is ClientCache: if type(cache) is ClientCache:
app._cache = CacheTracer(cache, '%s-%s.neo-cache-trace' % app._cache = CacheTracer(cache, '%s-%s.neo-cache-trace' %
...@@ -206,5 +205,3 @@ elif IF == 'trace-cache': ...@@ -206,5 +205,3 @@ elif IF == 'trace-cache':
app._cache.clear() app._cache.clear()
else: else:
app._cache = cache.close() app._cache = cache.close()
finally:
app._cache_lock_release()
...@@ -588,8 +588,8 @@ class Application(BaseApplication): ...@@ -588,8 +588,8 @@ class Application(BaseApplication):
node.send(Packets.StartOperation(self.backup_tid)) node.send(Packets.StartOperation(self.backup_tid))
uuid = node.getUUID() uuid = node.getUUID()
assert uuid not in self.storage_starting_set assert uuid not in self.storage_starting_set
if uuid not in self.storage_ready_dict: assert uuid not in self.storage_ready_dict
self.storage_starting_set.add(uuid) self.storage_starting_set.add(uuid)
def setStorageReady(self, uuid): def setStorageReady(self, uuid):
self.storage_starting_set.remove(uuid) self.storage_starting_set.remove(uuid)
......
...@@ -66,6 +66,7 @@ There is no conflict of node id between the 2 clusters: ...@@ -66,6 +66,7 @@ There is no conflict of node id between the 2 clusters:
class BackupApplication(object): class BackupApplication(object):
pt = None pt = None
server = None # like in BaseApplication
uuid = None uuid = None
def __init__(self, app, name, master_addresses): def __init__(self, app, name, master_addresses):
......
...@@ -781,11 +781,19 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -781,11 +781,19 @@ class MySQLDatabaseManager(DatabaseManager):
if max_tid is not None: if max_tid is not None:
sql += " AND tid <= %d" % max_tid sql += " AND tid <= %d" % max_tid
q = self.query q = self.query
q("DELETE FROM trans" + sql) if q("SELECT 1 FROM trans%s LIMIT 1" % sql):
q("DELETE FROM trans" + sql)
else:
logging.info("Nothing to truncate in trans for partition %s",
partition)
sql = " FROM obj" + sql sql = " FROM obj" + sql
data_id_list = [x for x, in q( data_id_list = [x for x, in q(
"SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)] "SELECT DISTINCT data_id%s AND data_id IS NOT NULL" % sql)]
q("DELETE" + sql) if q("SELECT 1%s LIMIT 1" % sql):
q("DELETE" + sql)
else:
logging.info("Nothing to truncate in obj for partition %s",
partition)
self._pruneData(data_id_list) self._pruneData(data_id_list)
def getTransaction(self, tid, all = False): def getTransaction(self, tid, all = False):
......
...@@ -34,7 +34,7 @@ class ClientOperationHandler(BaseHandler): ...@@ -34,7 +34,7 @@ class ClientOperationHandler(BaseHandler):
app = self.app app = self.app
if app.operational: if app.operational:
# Even if in most cases, abortFor is called from both this method # Even if in most cases, abortFor is called from both this method
# and BaseMasterHandler.notifyPartitionChanges (especially since # and BaseMasterHandler.notifyNodeInformation (especially since
# storage nodes disconnects unknown clients on their own), these 2 # storage nodes disconnects unknown clients on their own), these 2
# handlers also cover distinct scenarios, so neither of them is # handlers also cover distinct scenarios, so neither of them is
# redundant: # redundant:
......
...@@ -139,10 +139,11 @@ class TransactionManager(EventQueue): ...@@ -139,10 +139,11 @@ class TransactionManager(EventQueue):
def replicating(self, offset_list): def replicating(self, offset_list):
self._replicating.update(offset_list) self._replicating.update(offset_list)
isdisjoint = set(offset_list).isdisjoint if __debug__:
assert isdisjoint(self._replicated), (offset_list, self._replicated) isdisjoint = set(offset_list).isdisjoint
assert isdisjoint(map(self.getPartition, self._store_lock_dict)), ( assert isdisjoint(self._replicated), (offset_list, self._replicated)
offset_list, self._store_lock_dict) assert isdisjoint(map(self.getPartition, self._store_lock_dict)), (
offset_list, self._store_lock_dict)
p = Packets.AskUnfinishedTransactions(offset_list) p = Packets.AskUnfinishedTransactions(offset_list)
self._app.master_conn.ask(p, offset_list=offset_list) self._app.master_conn.ask(p, offset_list=offset_list)
......
...@@ -1120,8 +1120,7 @@ class NEOThreadedTest(NeoTestBase): ...@@ -1120,8 +1120,7 @@ class NEOThreadedTest(NeoTestBase):
def run(self): def run(self):
try: try:
apply(*self.__target) self.__result = apply(*self.__target)
self.__exc_info = None
except: except:
self.__exc_info = sys.exc_info() self.__exc_info = sys.exc_info()
if self.__exc_info[0] is NEOThreadedTest.failureException: if self.__exc_info[0] is NEOThreadedTest.failureException:
...@@ -1129,10 +1128,13 @@ class NEOThreadedTest(NeoTestBase): ...@@ -1129,10 +1128,13 @@ class NEOThreadedTest(NeoTestBase):
def join(self, timeout=None): def join(self, timeout=None):
threading.Thread.join(self, timeout) threading.Thread.join(self, timeout)
if not self.is_alive() and self.__exc_info: if not self.is_alive():
etype, value, tb = self.__exc_info try:
del self.__exc_info return self.__result
raise etype, value, tb except AttributeError:
etype, value, tb = self.__exc_info
del self.__exc_info
raise etype, value, tb
class newThread(newPausedThread): class newThread(newPausedThread):
......
# -*- coding: utf-8 -*-
# #
# Copyright (C) 2011-2019 Nexedi SA # Copyright (C) 2011-2019 Nexedi SA
# #
...@@ -865,6 +866,27 @@ class Test(NEOThreadedTest): ...@@ -865,6 +866,27 @@ class Test(NEOThreadedTest):
self.assertEqual(c.root()['1'].value, 1) self.assertEqual(c.root()['1'].value, 1)
self.assertNotIn('2', c.root()) self.assertNotIn('2', c.root())
@with_cluster()
def testLoadVsFinish(self, cluster):
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x1 = PCounter()
t1.commit()
t1.begin()
x1.value = 1
t2, c2 = cluster.getTransaction()
x2 = c2.root()['x']
cluster.client._cache.clear()
def _loadFromStorage(orig, *args):
r = orig(*args)
ll()
return r
with LockLock() as ll, Patch(cluster.client,
_loadFromStorage=_loadFromStorage):
t = self.newThread(x2._p_activate)
ll()
t1.commit()
t.join()
@with_cluster() @with_cluster()
def testInternalInvalidation(self, cluster): def testInternalInvalidation(self, cluster):
def _handlePacket(orig, conn, packet, kw={}, handler=None): def _handlePacket(orig, conn, packet, kw={}, handler=None):
...@@ -887,6 +909,72 @@ class Test(NEOThreadedTest): ...@@ -887,6 +909,72 @@ class Test(NEOThreadedTest):
t.join() t.join()
self.assertEqual(x2.value, 1) self.assertEqual(x2.value, 1)
@with_cluster()
def testInternalInvalidation2(self, cluster):
# same as testExternalInvalidation3 but with internal invalidations
t, c = cluster.getTransaction()
x = c.root()[''] = PCounter()
t.commit()
l1 = threading.Lock(); l1.acquire()
l2 = threading.Lock(); l2.acquire()
def sync(orig):
orig()
l2.release()
l1.acquire()
def raceBeforeInvalidateZODB(orig, transaction, f):
def callback(tid):
l1.release()
begin1.join()
f(tid)
return orig(transaction, callback)
def raceAfterInvalidateZODB(orig, transaction, f):
def callback(tid):
f(tid)
l1.release()
begin1.join()
return orig(transaction, callback)
class CacheLock(object):
def __init__(self):
self._lock = client._cache_lock
def __enter__(self):
self._lock.acquire()
def __exit__(self, t, v, tb):
self._lock.release()
p.revert()
load1.start()
l1.acquire()
def _loadFromStorage(orig, *args):
l1.release()
return orig(*args)
client = cluster.client
t2, c2 = cluster.getTransaction()
x2 = c2.root()['']
x2.value = 1
with Patch(client, tpc_finish=raceBeforeInvalidateZODB):
with Patch(client, sync=sync):
begin1 = self.newThread(t.begin)
l2.acquire()
t2.commit()
self.assertEqual(x.value, 0)
x._p_deactivate()
# On ZODB≥5, the following check would fail
# if tpc_finish updated app.last_tid earlier.
self.assertEqual(x.value, 0)
t.begin()
self.assertEqual(x.value, 1)
x2.value = big = 'x' * cluster.cache_size # force load from storage
with Patch(client, _cache_lock=CacheLock()) as p, \
Patch(client, _loadFromStorage=_loadFromStorage), \
Patch(client, tpc_finish=raceAfterInvalidateZODB):
with Patch(client, sync=sync):
begin1 = self.newThread(t.begin)
l2.acquire()
load1 = self.newPausedThread(lambda: x.value)
t2.commit()
# On ZODB<5, the following check would fail
# if tpc_finish updated app.last_tid later.
self.assertEqual(load1.join(), big)
@with_cluster() @with_cluster()
def testExternalInvalidation(self, cluster): def testExternalInvalidation(self, cluster):
# Initialize objects # Initialize objects
...@@ -952,6 +1040,8 @@ class Test(NEOThreadedTest): ...@@ -952,6 +1040,8 @@ class Test(NEOThreadedTest):
t.join() t.join()
self.assertEqual(x2.value, 1) self.assertEqual(x2.value, 1)
self.assertEqual(x1.value, 0) self.assertEqual(x1.value, 0)
self.assertEqual((x2._p_serial, x1._p_serial),
cluster.client._cache.load(x1._p_oid, x1._p_serial)[1:])
def invalidations(conn): def invalidations(conn):
try: try:
...@@ -989,7 +1079,7 @@ class Test(NEOThreadedTest): ...@@ -989,7 +1079,7 @@ class Test(NEOThreadedTest):
x = r[''] = PCounter() x = r[''] = PCounter()
t.commit() t.commit()
tid1 = x._p_serial tid1 = x._p_serial
nonlocal_ = [0, 1] nonlocal_ = [0, 0, 0]
l1 = threading.Lock(); l1.acquire() l1 = threading.Lock(); l1.acquire()
l2 = threading.Lock(); l2.acquire() l2 = threading.Lock(); l2.acquire()
def invalidateObjects(orig, *args): def invalidateObjects(orig, *args):
...@@ -999,27 +1089,136 @@ class Test(NEOThreadedTest): ...@@ -999,27 +1089,136 @@ class Test(NEOThreadedTest):
nonlocal_[0] += 1 nonlocal_[0] += 1
if nonlocal_[0] == 2: if nonlocal_[0] == 2:
l2.release() l2.release()
def _cache_lock_release(orig): class CacheLock(object):
orig() def __init__(self, client):
if nonlocal_[1]: self._lock = client._cache_lock
nonlocal_[1] = 0 def __enter__(self):
self._lock.acquire()
def __exit__(self, t, v, tb):
count = nonlocal_[1]
nonlocal_[1] = count + 1
self._lock.release()
if count == 0:
load_same.start()
l2.acquire()
elif count == 1:
load_other.start()
def _loadFromStorage(orig, *args):
count = nonlocal_[2]
nonlocal_[2] = count + 1
if not count:
l1.release() l1.release()
l2.acquire() return orig(*args)
with cluster.newClient() as client, \ with cluster.newClient() as client, \
Patch(client.notifications_handler, Patch(client.notifications_handler,
invalidateObjects=invalidateObjects): invalidateObjects=invalidateObjects):
client.sync() client.sync()
with cluster.master.filterConnection(client) as mc2: with cluster.master.filterConnection(client) as mc2:
mc2.delayInvalidateObjects() mc2.delayInvalidateObjects()
# A first client node (C1) modifies an oid whereas
# invalidations to the other node (C2) are delayed.
x._p_changed = 1 x._p_changed = 1
t.commit() t.commit()
tid2 = x._p_serial tid2 = x._p_serial
# C2 loads the most recent revision of this oid (last_tid=tid1).
self.assertEqual((tid1, tid2), client.load(x._p_oid)[1:]) self.assertEqual((tid1, tid2), client.load(x._p_oid)[1:])
# C2 poll thread is frozen just before processing invalidation
# packet for tid2. C1 modifies something else -> tid3
r._p_changed = 1 r._p_changed = 1
t.commit() t.commit()
with Patch(client, _cache_lock_release=_cache_lock_release): self.assertEqual(tid1, client.last_tid)
self.assertEqual((tid2, None), client.load(x._p_oid)[1:]) load_same = self.newPausedThread(client.load, x._p_oid)
self.assertEqual(nonlocal_, [2, 0]) load_other = self.newPausedThread(client.load, r._p_oid)
with Patch(client, _cache_lock=CacheLock(client)), \
Patch(client, _loadFromStorage=_loadFromStorage):
# 1. Just after having found nothing in cache, the worker
# thread asks the poll thread to get notified about
# invalidations for the loading oid.
# <context switch> (l1)
# 2. Both invalidations are processed. -> last_tid=tid3
# <context switch> (l2)
# 3. The worker thread loads before tid3+1.
# The poll thread notified [tid2], which must be ignored.
# In parallel, 2 other loads are done (both cache misses):
# - one for the same oid, which waits for first load to
# complete and in particular fill cache, in order to
# avoid asking the same data to the storage node
# - another for a different oid, which doesn't wait, as shown
# by the fact that it returns an old record (i.e. before any
# invalidation packet is processed)
loaded = client.load(x._p_oid)
self.assertEqual((tid2, None), loaded[1:])
self.assertEqual(loaded, load_same.join())
self.assertEqual((tid1, r._p_serial), load_other.join()[1:])
# To summary:
# - 3 concurrent loads starting with cache misses
# - 2 loads from storage
# - 1 load ending with a cache hit
self.assertEqual(nonlocal_, [2, 8, 2])
@with_cluster(serialized=False)
def testExternalInvalidation3(self, cluster):
# same as testInternalInvalidation2 but with external invalidations
t, c = cluster.getTransaction()
x = c.root()[''] = PCounter()
t.commit()
def sync(orig):
orig()
ll_sync()
def raceBeforeInvalidateZODB(orig, *args):
ll_inv()
orig(*args)
def raceAfterInvalidateZODB(orig, *args):
orig(*args)
ll_inv()
l1 = threading.Lock(); l1.acquire()
l2 = threading.Lock(); l2.acquire()
class CacheLock(object):
def __init__(self):
self._lock = client._cache_lock
def __enter__(self):
self._lock.acquire()
def __exit__(self, t, v, tb):
self._lock.release()
l1.release()
l2.acquire()
def _loadFromStorage(orig, *args):
l2.release()
return orig(*args)
client = cluster.client
with cluster.newClient(1) as db:
t2, c2 = cluster.getTransaction(db)
x2 = c2.root()['']
x2.value = 1
with Patch(client._db, invalidate=raceBeforeInvalidateZODB), \
LockLock() as ll_inv:
with Patch(client, sync=sync), LockLock() as ll_sync:
begin1 = self.newThread(t.begin)
ll_sync()
t2.commit()
ll_inv()
begin1.join()
self.assertEqual(x.value, 0)
x._p_deactivate()
# On ZODB≥5, the following check would fail if
# invalidateObjects updated app.last_tid earlier.
self.assertEqual(x.value, 0)
t.begin()
self.assertEqual(x.value, 1)
x2.value = 2
with Patch(client, _cache_lock=CacheLock()), \
Patch(client._db, invalidate=raceAfterInvalidateZODB), \
LockLock() as ll_inv:
with Patch(client, sync=sync), LockLock() as ll_sync:
begin1 = self.newThread(t.begin)
ll_sync()
t2.commit()
ll_inv()
begin1.join()
with Patch(client, _loadFromStorage=_loadFromStorage):
# On ZODB<5, the following check would fail if
# invalidateObjects updated app.last_tid later.
self.assertEqual(x.value, 2)
@with_cluster(storage_count=2, partitions=2) @with_cluster(storage_count=2, partitions=2)
def testReadVerifyingStorage(self, cluster): def testReadVerifyingStorage(self, cluster):
......
...@@ -400,6 +400,22 @@ class ReplicationTests(NEOThreadedTest): ...@@ -400,6 +400,22 @@ class ReplicationTests(NEOThreadedTest):
self.tic() self.tic()
self.assertTrue(backup.master.is_alive()) self.assertTrue(backup.master.is_alive())
@with_cluster(master_count=2)
def testBackupFromUpstreamWithSecondaryMaster(self, upstream):
"""
Check that the backup master reacts correctly when connecting first
to a secondary master of the upstream cluster.
"""
with NEOCluster(upstream=upstream) as backup:
primary = upstream.primary_master
m, = (m for m in upstream.master_list if m is not primary)
backup.master.resetNode(upstream_masters=[m.server])
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.BACKINGUP)
@backup_test() @backup_test()
def testCreationUndone(self, backup): def testCreationUndone(self, backup):
""" """
......
...@@ -39,6 +39,14 @@ class BasicTests(ZODBTestCase, StorageTestBase, BasicStorage): ...@@ -39,6 +39,14 @@ class BasicTests(ZODBTestCase, StorageTestBase, BasicStorage):
with Patch(threaded, TIC_LOOP=TIC_LOOP()): with Patch(threaded, TIC_LOOP=TIC_LOOP()):
super(BasicTests, self).check_checkCurrentSerialInTransaction() super(BasicTests, self).check_checkCurrentSerialInTransaction()
# The test expects that both load & lastTransaction would be blocked
# as long as the tpc_finish callback has not finished, taking more
# than .1 second. ZODB 5.6.0 clarified that lastTransaction() can
# return immediately with the previous last TID rather than blocking
# until it is allowed to return the new last TID.
check_tid_ordering_w_commit = unittest.skip("ZODB PR #316")(
BasicStorage.check_tid_ordering_w_commit)
if __name__ == "__main__": if __name__ == "__main__":
suite = unittest.makeSuite(BasicTests, 'check') suite = unittest.makeSuite(BasicTests, 'check')
unittest.main(defaultTest='suite') unittest.main(defaultTest='suite')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment