Commit 36b2d141 authored by Julien Muchembled's avatar Julien Muchembled

master,client: ignore notifications before complete initialization

A backup master crashed with the following traceback after a reconnection:

    Traceback (most recent call last):
      File "neo/master/app.py", line 127, in run
        self._run()
      File "neo/master/app.py", line 147, in _run
        self.playPrimaryRole()
      File "neo/master/app.py", line 348, in playPrimaryRole
        self.backup_app.provideService())
      File "neo/master/backup_app.py", line 123, in provideService
        poll(1)
      File "neo/lib/event.py", line 126, in poll
        to_process.process()
      File "neo/lib/connection.py", line 500, in process
        self._handlers.handle(self, self._queue.pop(0))
      File "neo/lib/connection.py", line 110, in handle
        self._handle(connection, packet)
      File "neo/lib/connection.py", line 125, in _handle
        handler.packetReceived(connection, packet)
      File "neo/lib/handler.py", line 117, in packetReceived
        self.dispatch(*args)
      File "neo/lib/handler.py", line 66, in dispatch
        method(conn, *args, **kw)
      File "neo/master/handlers/backup.py", line 52, in invalidateObjects
        app.invalidatePartitions(tid, partition_set)
      File "neo/master/backup_app.py", line 257, in invalidatePartitions
        self.triggerBackup(node)
      File "neo/master/backup_app.py", line 281, in triggerBackup
        assert cell_list, offset
    AssertionError: 0
parent 02292584
Pipeline #4103 skipped
...@@ -220,6 +220,7 @@ class Application(ThreadedApplication): ...@@ -220,6 +220,7 @@ class Application(ThreadedApplication):
ask = self._ask ask = self._ask
handler = self.primary_bootstrap_handler handler = self.primary_bootstrap_handler
while 1: while 1:
self.ignore_invalidations = True
# Get network connection to primary master # Get network connection to primary master
while 1: while 1:
if self.primary_master_node is not None: if self.primary_master_node is not None:
......
...@@ -120,6 +120,7 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -120,6 +120,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
db = app.getDB() db = app.getDB()
db is None or db.invalidateCache() db is None or db.invalidateCache()
app.last_tid = ltid app.last_tid = ltid
app.ignore_invalidations = False
def answerTransactionFinished(self, conn, _, tid, callback, cache_dict): def answerTransactionFinished(self, conn, _, tid, callback, cache_dict):
app = self.app app = self.app
...@@ -159,6 +160,8 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -159,6 +160,8 @@ class PrimaryNotificationsHandler(MTEventHandler):
def invalidateObjects(self, conn, tid, oid_list): def invalidateObjects(self, conn, tid, oid_list):
app = self.app app = self.app
if app.ignore_invalidations:
return
app.last_tid = tid app.last_tid = tid
app._cache_lock_acquire() app._cache_lock_acquire()
try: try:
......
...@@ -113,6 +113,7 @@ class BackupApplication(object): ...@@ -113,6 +113,7 @@ class BackupApplication(object):
del bootstrap, node del bootstrap, node
if num_partitions != pt.getPartitions(): if num_partitions != pt.getPartitions():
raise RuntimeError("inconsistent number of partitions") raise RuntimeError("inconsistent number of partitions")
self.ignore_invalidations = True
self.pt = PartitionTable(num_partitions, num_replicas) self.pt = PartitionTable(num_partitions, num_replicas)
conn.setHandler(BackupHandler(self)) conn.setHandler(BackupHandler(self))
conn.ask(Packets.AskPartitionTable()) conn.ask(Packets.AskPartitionTable())
......
...@@ -29,7 +29,8 @@ class BackupHandler(EventHandler): ...@@ -29,7 +29,8 @@ class BackupHandler(EventHandler):
self.app.pt.load(ptid, row_list, self.app.nm) self.app.pt.load(ptid, row_list, self.app.nm)
def notifyPartitionChanges(self, conn, ptid, cell_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.pt.update(ptid, cell_list, self.app.nm) if self.app.pt.filled():
self.app.pt.update(ptid, cell_list, self.app.nm)
def answerLastTransaction(self, conn, tid): def answerLastTransaction(self, conn, tid):
app = self.app app = self.app
...@@ -37,9 +38,12 @@ class BackupHandler(EventHandler): ...@@ -37,9 +38,12 @@ class BackupHandler(EventHandler):
app.invalidatePartitions(tid, set(xrange(app.pt.getPartitions()))) app.invalidatePartitions(tid, set(xrange(app.pt.getPartitions())))
else: # upstream DB is empty else: # upstream DB is empty
assert app.app.getLastTransaction() == tid assert app.app.getLastTransaction() == tid
app.ignore_invalidations = False
def invalidateObjects(self, conn, tid, oid_list): def invalidateObjects(self, conn, tid, oid_list):
app = self.app app = self.app
if app.ignore_invalidations:
return
getPartition = app.app.pt.getPartition getPartition = app.app.pt.getPartition
partition_set = set(map(getPartition, oid_list)) partition_set = set(map(getPartition, oid_list))
partition_set.add(getPartition(tid)) partition_set.add(getPartition(tid))
......
...@@ -24,6 +24,7 @@ from collections import defaultdict ...@@ -24,6 +24,7 @@ from collections import defaultdict
from functools import wraps from functools import wraps
from neo.lib import logging from neo.lib import logging
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
from neo.master.handlers.backup import BackupHandler
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator from neo.storage.replicator import Replicator
from neo.lib.connector import SocketConnector from neo.lib.connector import SocketConnector
...@@ -307,6 +308,31 @@ class ReplicationTests(NEOThreadedTest): ...@@ -307,6 +308,31 @@ class ReplicationTests(NEOThreadedTest):
self.tic() self.tic()
self.assertEqual(1, self.checkBackup(backup)) self.assertEqual(1, self.checkBackup(backup))
def testBackupEarlyInvalidation(self):
"""
The backup master must ignore notification before being fully
initialized.
"""
upstream = NEOCluster()
try:
upstream.start()
backup = NEOCluster(upstream=upstream)
try:
backup.start()
with ConnectionFilter() as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.AskPartitionTable) and
isinstance(conn.getHandler(), BackupHandler))
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
upstream.importZODB()(1)
self.tic()
self.tic()
self.assertTrue(backup.master.isAlive())
finally:
backup.stop()
finally:
upstream.stop()
def testSafeTweak(self): def testSafeTweak(self):
""" """
Check that tweak always tries to keep a minimum of (replicas + 1) Check that tweak always tries to keep a minimum of (replicas + 1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment