Commit 781b4eb5 authored by Julien Muchembled's avatar Julien Muchembled

master: fix crash when a transaction begins while a storage node starts operation

Traceback (most recent call last):
  ...
  File "neo/lib/handler.py", line 72, in dispatch
    method(conn, *args, **kw)
  File "neo/master/handlers/client.py", line 70, in askFinishTransaction
    conn.getPeerId(),
  File "neo/master/transactions.py", line 387, in prepare
    assert node_list, (ready, failed)
AssertionError: (set([]), frozenset([]))

Master log leading to the crash:
  PACKET    #0x0009 StartOperation                 > S1
  PACKET    #0x0004 BeginTransaction               < C1
  DEBUG     Begin <...>
  PACKET    #0x0004 AnswerBeginTransaction         > C1
  PACKET    #0x0001 NotifyReady                    < S1

It was wrong to process BeginTransaction before receiving NotifyReady.

The changes in the storage are cosmetics: the 'ready' attribute has become
redundant with 'operational'.
parent 0fd3b652
...@@ -68,6 +68,7 @@ class Application(BaseApplication): ...@@ -68,6 +68,7 @@ class Application(BaseApplication):
self.autostart = config.getAutostart() self.autostart = config.getAutostart()
self.storage_ready_dict = {} self.storage_ready_dict = {}
self.storage_starting_set = set()
for master_address in config.getMasters(): for master_address in config.getMasters():
self.nm.createMaster(address=master_address) self.nm.createMaster(address=master_address)
...@@ -367,6 +368,9 @@ class Application(BaseApplication): ...@@ -367,6 +368,9 @@ class Application(BaseApplication):
truncate = Packets.Truncate(*e.args) if e.args else None truncate = Packets.Truncate(*e.args) if e.args else None
# Automatic restart except if we truncate or retry to. # Automatic restart except if we truncate or retry to.
self._startup_allowed = not (self.truncate_tid or truncate) self._startup_allowed = not (self.truncate_tid or truncate)
self.storage_readiness = 0
self.storage_ready_dict.clear()
self.storage_starting_set.clear()
node_list = [] node_list = []
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient(): if node.isStorage() or node.isClient():
...@@ -574,12 +578,23 @@ class Application(BaseApplication): ...@@ -574,12 +578,23 @@ class Application(BaseApplication):
self.last_transaction = tid self.last_transaction = tid
def setStorageNotReady(self, uuid): def setStorageNotReady(self, uuid):
self.storage_starting_set.discard(uuid)
self.storage_ready_dict.pop(uuid, None) self.storage_ready_dict.pop(uuid, None)
self.tm.executeQueuedEvents()
def setStorageReady(self, uuid): def startStorage(self, node):
node.notify(Packets.StartOperation(self.backup_tid))
uuid = node.getUUID()
assert uuid not in self.storage_starting_set
if uuid not in self.storage_ready_dict: if uuid not in self.storage_ready_dict:
self.storage_readiness = self.storage_ready_dict[uuid] = \ self.storage_starting_set.add(uuid)
self.storage_readiness + 1
def setStorageReady(self, uuid):
self.storage_starting_set.remove(uuid)
assert uuid not in self.storage_ready_dict, self.storage_ready_dict
self.storage_readiness = self.storage_ready_dict[uuid] = \
self.storage_readiness + 1
self.tm.executeQueuedEvents()
def isStorageReady(self, uuid): def isStorageReady(self, uuid):
return uuid in self.storage_ready_dict return uuid in self.storage_ready_dict
......
...@@ -136,7 +136,3 @@ class BaseServiceHandler(MasterHandler): ...@@ -136,7 +136,3 @@ class BaseServiceHandler(MasterHandler):
app.broadcastPartitionChanges(app.pt.outdate(node)) app.broadcastPartitionChanges(app.pt.outdate(node))
if not app.pt.operational(): if not app.pt.operational():
raise StoppedOperation raise StoppedOperation
def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID())
...@@ -141,10 +141,9 @@ class AdministrationHandler(MasterHandler): ...@@ -141,10 +141,9 @@ class AdministrationHandler(MasterHandler):
for node in app.nm.getStorageList() for node in app.nm.getStorageList()
if node.isPending() and node.getUUID() in uuid_list)) if node.isPending() and node.getUUID() in uuid_list))
if node_list: if node_list:
p = Packets.StartOperation(bool(app.backup_tid))
for node in node_list: for node in node_list:
node.setRunning() node.setRunning()
node.notify(p) app.startStorage(node)
app.broadcastNodesInformation(node_list) app.broadcastNodesInformation(node_list)
conn.answer(Errors.Ack('Nodes added: %s' % conn.answer(Errors.Ack('Nodes added: %s' %
', '.join(uuid_str(x.getUUID()) for x in node_list))) ', '.join(uuid_str(x.getUUID()) for x in node_list)))
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.handler import DelayEvent
from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors from neo.lib.protocol import NodeStates, Packets, ProtocolError, MAX_TID, Errors
from ..app import monotonic_time from ..app import monotonic_time
from . import MasterHandler from . import MasterHandler
...@@ -44,6 +45,11 @@ class ClientServiceHandler(MasterHandler): ...@@ -44,6 +45,11 @@ class ClientServiceHandler(MasterHandler):
A client request a TID, nothing is kept about it until the finish. A client request a TID, nothing is kept about it until the finish.
""" """
app = self.app app = self.app
# Delay new transaction as long as we are waiting for NotifyReady
# answers, otherwise we can know if the client is expected to commit
# the transaction in full to all these storage nodes.
if app.storage_starting_set:
raise DelayEvent
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
tid = app.tm.begin(node, app.storage_readiness, tid) tid = app.tm.begin(node, app.storage_readiness, tid)
conn.answer(Packets.AnswerBeginTransaction(tid)) conn.answer(Packets.AnswerBeginTransaction(tid))
...@@ -52,7 +58,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -52,7 +58,7 @@ class ClientServiceHandler(MasterHandler):
conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids))) conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
def getEventQueue(self): def getEventQueue(self):
# for failedVote # for askBeginTransaction & failedVote
return self.app.tm return self.app.tm
def failedVote(self, conn, *args): def failedVote(self, conn, *args):
......
...@@ -28,8 +28,12 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -28,8 +28,12 @@ class StorageServiceHandler(BaseServiceHandler):
app = self.app app = self.app
if new: if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new) super(StorageServiceHandler, self).connectionCompleted(conn, new)
if app.nm.getByUUID(conn.getUUID()).isRunning(): # node may be PENDING node = app.nm.getByUUID(conn.getUUID())
conn.notify(Packets.StartOperation(app.backup_tid)) if node.isRunning(): # node may be PENDING
app.startStorage(node)
def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID())
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
......
...@@ -69,9 +69,6 @@ class Application(BaseApplication): ...@@ -69,9 +69,6 @@ class Application(BaseApplication):
# operation related data # operation related data
self.operational = False self.operational = False
# ready is True when operational and got all informations
self.ready = False
self.dm.setup(reset=config.getReset()) self.dm.setup(reset=config.getReset())
self.loadConfiguration() self.loadConfiguration()
...@@ -167,10 +164,9 @@ class Application(BaseApplication): ...@@ -167,10 +164,9 @@ class Application(BaseApplication):
# Connect to a primary master node, verify data, and # Connect to a primary master node, verify data, and
# start the operation. This cycle will be executed permanently, # start the operation. This cycle will be executed permanently,
# until the user explicitly requests a shutdown. # until the user explicitly requests a shutdown.
self.ready = False self.operational = False
while True: while True:
self.cluster_state = None self.cluster_state = None
self.operational = False
if self.master_node is None: if self.master_node is None:
# look for the primary master # look for the primary master
self.connectToPrimary() self.connectToPrimary()
...@@ -190,7 +186,7 @@ class Application(BaseApplication): ...@@ -190,7 +186,7 @@ class Application(BaseApplication):
except PrimaryFailure, msg: except PrimaryFailure, msg:
logging.error('primary master is down: %s', msg) logging.error('primary master is down: %s', msg)
finally: finally:
self.ready = False self.operational = False
# When not ready, we reject any incoming connection so for # When not ready, we reject any incoming connection so for
# consistency, we also close any connection except that to the # consistency, we also close any connection except that to the
# master. This includes connections to other storage nodes and any # master. This includes connections to other storage nodes and any
...@@ -239,7 +235,6 @@ class Application(BaseApplication): ...@@ -239,7 +235,6 @@ class Application(BaseApplication):
self.master_conn.setHandler(initialization.InitializationHandler(self)) self.master_conn.setHandler(initialization.InitializationHandler(self))
while not self.operational: while not self.operational:
_poll() _poll()
self.ready = True
self.master_conn.notify(Packets.NotifyReady()) self.master_conn.notify(Packets.NotifyReady())
def doOperation(self): def doOperation(self):
......
...@@ -36,7 +36,7 @@ class IdentificationHandler(EventHandler): ...@@ -36,7 +36,7 @@ class IdentificationHandler(EventHandler):
self.checkClusterName(name) self.checkClusterName(name)
app = self.app app = self.app
# reject any incoming connections if not ready # reject any incoming connections if not ready
if not app.ready: if not app.operational:
raise NotReadyError raise NotReadyError
if uuid is None: if uuid is None:
if node_type != NodeTypes.STORAGE: if node_type != NodeTypes.STORAGE:
......
...@@ -46,7 +46,7 @@ class StorageOperationHandler(EventHandler): ...@@ -46,7 +46,7 @@ class StorageOperationHandler(EventHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
if app.ready and conn.isClient(): if app.operational and conn.isClient():
# XXX: Connection and Node should merged. # XXX: Connection and Node should merged.
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid: if uuid:
...@@ -62,7 +62,7 @@ class StorageOperationHandler(EventHandler): ...@@ -62,7 +62,7 @@ class StorageOperationHandler(EventHandler):
# Client # Client
def connectionFailed(self, conn): def connectionFailed(self, conn):
if self.app.ready: if self.app.operational:
self.app.replicator.abort() self.app.replicator.abort()
def _acceptIdentification(self, node, *args): def _acceptIdentification(self, node, *args):
......
...@@ -286,7 +286,8 @@ class Replicator(object): ...@@ -286,7 +286,8 @@ class Replicator(object):
if self.current_partition is not None or not self.replicate_dict: if self.current_partition is not None or not self.replicate_dict:
return return
app = self.app app = self.app
assert app.master_conn and app.ready, (app.master_conn, app.ready) assert app.master_conn and app.operational, (
app.master_conn, app.operational)
# Start replicating the partition which is furthest behind, # Start replicating the partition which is furthest behind,
# to increase the overall backup_tid as soon as possible. # to increase the overall backup_tid as soon as possible.
# Then prefer a partition with no unfinished transaction. # Then prefer a partition with no unfinished transaction.
......
...@@ -28,7 +28,7 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase): ...@@ -28,7 +28,7 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase):
config = self.getStorageConfiguration(master_number=1) config = self.getStorageConfiguration(master_number=1)
self.app = Application(config) self.app = Application(config)
self.app.name = 'NEO' self.app.name = 'NEO'
self.app.ready = True self.app.operational = True
self.app.pt = PartitionTable(4, 1) self.app.pt = PartitionTable(4, 1)
self.identification = IdentificationHandler(self.app) self.identification = IdentificationHandler(self.app)
......
...@@ -495,6 +495,34 @@ class Test(NEOThreadedTest): ...@@ -495,6 +495,34 @@ class Test(NEOThreadedTest):
self.tic() self.tic()
self.assertPartitionTable(cluster, 'UUO', s1) self.assertPartitionTable(cluster, 'UUO', s1)
@with_cluster()
def testStartOperation(self, cluster):
t, c = cluster.getTransaction()
c.root()._p_changed = 1
cluster.storage.stop()
cluster.join(cluster.storage_list)
cluster.storage.resetNode()
delayed = []
def delayConnection(conn, packet):
return conn in delayed
def startOperation(orig, self, conn, backup):
assert not delayed, delayed
delayed.append(conn)
orig(self, conn, backup)
def askBeginTransaction(orig, *args):
f.discard(delayConnection)
orig(*args)
with ConnectionFilter() as f, \
Patch(InitializationHandler, startOperation=startOperation), \
Patch(cluster.master.client_service_handler,
askBeginTransaction=askBeginTransaction) as p:
f.add(delayConnection)
cluster.storage.start()
self.tic()
t.commit()
self.assertNotIn(delayConnection, f)
self.assertTrue(delayed)
@with_cluster(replicas=1, partitions=10) @with_cluster(replicas=1, partitions=10)
def testRestartWithMissingStorage(self, cluster): def testRestartWithMissingStorage(self, cluster):
# translated from neo.tests.functional.testStorage.StorageTest # translated from neo.tests.functional.testStorage.StorageTest
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment