Commit eb667c7c authored by Vincent Pelletier's avatar Vincent Pelletier

Merge PrimaryMaster and RequestIdentification content.

Preserve AskPrimary/AnswerPrimary for neoctl/neoadmin exchange only.
Fixes several race conditions during election.
parent 98b6210c
...@@ -78,7 +78,7 @@ class AdminEventHandler(EventHandler): ...@@ -78,7 +78,7 @@ class AdminEventHandler(EventHandler):
@check_primary_master @check_primary_master
def askPrimary(self, conn): def askPrimary(self, conn):
master_node = self.app.master_node master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID(), [])) conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
addPendingNodes = forward_ask(Packets.AddPendingNodes) addPendingNodes = forward_ask(Packets.AddPendingNodes)
setClusterState = forward_ask(Packets.SetClusterState) setClusterState = forward_ask(Packets.SetClusterState)
......
...@@ -281,7 +281,6 @@ class Application(object): ...@@ -281,7 +281,6 @@ class Application(object):
neo.lib.logging.debug('connecting to primary master...') neo.lib.logging.debug('connecting to primary master...')
ready = False ready = False
nm = self.nm nm = self.nm
packet = Packets.AskPrimary()
while not ready: while not ready:
# Get network connection to primary master # Get network connection to primary master
index = 0 index = 0
...@@ -315,7 +314,8 @@ class Application(object): ...@@ -315,7 +314,8 @@ class Application(object):
self.trying_master_node) self.trying_master_node)
continue continue
try: try:
self._ask(conn, packet, self._ask(conn, Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name),
handler=self.primary_bootstrap_handler) handler=self.primary_bootstrap_handler)
except ConnectionClosed: except ConnectionClosed:
continue continue
...@@ -342,16 +342,6 @@ class Application(object): ...@@ -342,16 +342,6 @@ class Application(object):
neo.lib.logging.info('Initializing from master') neo.lib.logging.info('Initializing from master')
ask = self._ask ask = self._ask
handler = self.primary_bootstrap_handler handler = self.primary_bootstrap_handler
# Identify to primary master and request initial data
p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid, None,
self.name)
assert self.master_conn is None, self.master_conn
while self.master_conn is None:
ask(conn, p, handler=handler)
if conn.getUUID() is None:
# Node identification was refused by master, it is considered
# as the primary as long as we are connected to it.
time.sleep(1)
ask(conn, Packets.AskNodeInformation(), handler=handler) ask(conn, Packets.AskNodeInformation(), handler=handler)
ask(conn, Packets.AskPartitionTable(), handler=handler) ask(conn, Packets.AskPartitionTable(), handler=handler)
return self.pt.operational() return self.pt.operational()
......
...@@ -28,36 +28,27 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -28,36 +28,27 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
app = self.app app = self.app
app.trying_master_node = None app.trying_master_node = None
def acceptIdentification(self, conn, node_type, def acceptIdentification(self, conn, node_type, uuid, num_partitions,
uuid, num_partitions, num_replicas, your_uuid): num_replicas, your_uuid, primary_uuid, known_master_list):
app = self.app app = self.app
# this must be a master node # this must be a master node
if node_type != NodeTypes.MASTER: if node_type != NodeTypes.MASTER:
conn.close() conn.close()
return return
# the master must give an UUID
if your_uuid is None:
raise ProtocolError('No UUID supplied')
app.uuid = your_uuid
neo.lib.logging.info('Got an UUID: %s', dump(app.uuid))
app.nm.getByAddress(conn.getAddress()).setUUID(uuid)
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
app.master_conn = conn
def answerPrimary(self, conn, primary_uuid,
known_master_list):
app = self.app
# Register new master nodes. # Register new master nodes.
for address, uuid in known_master_list: found = False
n = app.nm.getByAddress(address) conn_address = conn.getAddress()
for node_address, node_uuid in known_master_list:
if node_address == conn_address:
assert uuid == node_uuid, (dump(uuid), dump(node_uuid))
found = True
n = app.nm.getByAddress(node_address)
if n is None: if n is None:
app.nm.createMaster(address=address) n = app.nm.createMaster(address=node_address)
if uuid is not None and n.getUUID() != uuid: if node_uuid is not None and n.getUUID() != node_uuid:
n.setUUID(uuid) n.setUUID(node_uuid)
assert found, (conn, dump(uuid), known_master_list)
if primary_uuid is not None: if primary_uuid is not None:
primary_node = app.nm.getByUUID(primary_uuid) primary_node = app.nm.getByUUID(primary_uuid)
...@@ -66,6 +57,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -66,6 +57,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
# is old. So ignore it. # is old. So ignore it.
neo.lib.logging.warning('Unknown primary master UUID: %s. ' \ neo.lib.logging.warning('Unknown primary master UUID: %s. ' \
'Ignoring.' % dump(primary_uuid)) 'Ignoring.' % dump(primary_uuid))
return
else: else:
if app.trying_master_node is not primary_node: if app.trying_master_node is not primary_node:
app.trying_master_node = None app.trying_master_node = None
...@@ -79,6 +71,17 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -79,6 +71,17 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
return
# the master must give an UUID
if your_uuid is None:
raise ProtocolError('No UUID supplied')
app.uuid = your_uuid
neo.lib.logging.info('Got an UUID: %s', dump(app.uuid))
# Always create partition table
app.pt = PartitionTable(num_partitions, num_replicas)
app.master_conn = conn
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
assert row_list assert row_list
......
...@@ -48,7 +48,10 @@ class StorageBootstrapHandler(AnswerBaseHandler): ...@@ -48,7 +48,10 @@ class StorageBootstrapHandler(AnswerBaseHandler):
raise NodeNotReady(message) raise NodeNotReady(message)
def acceptIdentification(self, conn, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid, primary_uuid,
master_list):
assert primary_uuid == self.app.primary_master_node.getUUID(), (
dump(primary_uuid), dump(self.app.primary_master_node.getUUID()))
# this must be a storage node # this must be a storage node
if node_type != NodeTypes.STORAGE: if node_type != NodeTypes.STORAGE:
conn.close() conn.close()
......
...@@ -46,6 +46,15 @@ class BootstrapManager(EventHandler): ...@@ -46,6 +46,15 @@ class BootstrapManager(EventHandler):
self.num_partitions = None self.num_partitions = None
self.current = None self.current = None
def notifyNodeInformation(self, conn, node_list):
pass
def announcePrimary(self, conn):
# We found the primary master early enough to be notified of election
# end. Lucky. Anyway, we must carry on with identification request, so
# nothing to do here.
pass
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
""" """
Triggered when the network connection is successful. Triggered when the network connection is successful.
...@@ -53,7 +62,8 @@ class BootstrapManager(EventHandler): ...@@ -53,7 +62,8 @@ class BootstrapManager(EventHandler):
""" """
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
self.current.setRunning() self.current.setRunning()
conn.ask(Packets.AskPrimary()) conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.name))
def connectionFailed(self, conn): def connectionFailed(self, conn):
""" """
...@@ -79,12 +89,8 @@ class BootstrapManager(EventHandler): ...@@ -79,12 +89,8 @@ class BootstrapManager(EventHandler):
""" """
conn.close() conn.close()
def answerPrimary(self, conn, primary_uuid, known_master_list): def acceptIdentification(self, conn, node_type, uuid, num_partitions,
""" num_replicas, your_uuid, primary_uuid, known_master_list):
A master answer who's the primary. If it's another node, connect to it.
If it's itself then the primary is successfully found, ask
identification.
"""
nm = self.app.nm nm = self.app.nm
# Register new master nodes. # Register new master nodes.
...@@ -104,14 +110,6 @@ class BootstrapManager(EventHandler): ...@@ -104,14 +110,6 @@ class BootstrapManager(EventHandler):
return return
neo.lib.logging.info('connected to a primary master node') neo.lib.logging.info('connected to a primary master node')
conn.ask(Packets.RequestIdentification(self.node_type,
self.uuid, self.server, self.name))
def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid):
"""
The primary master has accepted the node.
"""
self.num_partitions = num_partitions self.num_partitions = num_partitions
self.num_replicas = num_replicas self.num_replicas = num_replicas
if self.uuid != your_uuid: if self.uuid != your_uuid:
......
...@@ -692,6 +692,13 @@ class RequestIdentification(Packet): ...@@ -692,6 +692,13 @@ class RequestIdentification(Packet):
PNumber('num_partitions'), PNumber('num_partitions'),
PNumber('num_replicas'), PNumber('num_replicas'),
PUUID('your_uuid'), PUUID('your_uuid'),
PUUID('primary_uuid'),
PList('known_master_list',
PStruct('master',
PAddress('address'),
PUUID('uuid'),
),
),
) )
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
...@@ -706,19 +713,10 @@ class RequestIdentification(Packet): ...@@ -706,19 +713,10 @@ class RequestIdentification(Packet):
class PrimaryMaster(Packet): class PrimaryMaster(Packet):
""" """
Ask a current primary master node. This must be the second message when Ask current primary master's uuid. CTL -> A.
connecting to a master node. Any -> M.
Reply to Ask Primary Master. This message includes a list of known master
nodes to make sure that a peer has the same information. M -> Any.
""" """
_answer = PStruct('answer_primary', _answer = PStruct('answer_primary',
PUUID('primary_uuid'), PUUID('primary_uuid'),
PList('known_master_list',
PStruct('master',
PAddress('address'),
PUUID('uuid'),
),
),
) )
class AnnouncePrimary(Packet): class AnnouncePrimary(Packet):
......
...@@ -297,31 +297,32 @@ class Application(object): ...@@ -297,31 +297,32 @@ class Application(object):
def playPrimaryRole(self): def playPrimaryRole(self):
neo.lib.logging.info( neo.lib.logging.info(
'play the primary role with %r', self.listening_conn) 'play the primary role with %r', self.listening_conn)
packet = Packets.AnnouncePrimary()
for conn in self.em.getClientList():
conn.notify(packet)
conn.abort()
self.listening_conn.setHandler(
identification.IdentificationHandler(self))
em = self.em em = self.em
nm = self.nm packet = Packets.AnnouncePrimary()
# Close all remaining connections to other masters,
# for the same reason as in playSecondaryRole.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
conn_uuid = conn.getUUID() if conn.isListening():
if conn_uuid is not None: conn.setHandler(identification.IdentificationHandler(self))
node = nm.getByUUID(conn_uuid) else:
assert node is not None conn.notify(packet)
assert node.isMaster() and not conn.isClient() # Primary master should rather establish connections to all
assert node.isUnknown() # secondaries, rather than the other way around. This requires
# this may trigger 'unexpected answer' warnings on remote side # a bit more work when a new master joins a cluster but makes
conn.close() # it easier to resolve UUID conflicts with minimal cluster
# impact, and ensure primary master unicity (primary masters
# become noisy, in that they actively try to maintain
# connections to all other master nodes, so duplicate
# primaries will eventually get in touch with each other and
# resolve the situation with a duel).
# TODO: only abort client connections, don't close server
# connections as we want to have them in the end. Secondary
# masters will reconnect nevertheless, but it's dirty.
# Currently, it's not trivial to preserve connected nodes,
# because of poor node status tracking during election.
conn.abort()
# If I know any storage node, make sure that they are not in the # If I know any storage node, make sure that they are not in the
# running state, because they are not connected at this stage. # running state, because they are not connected at this stage.
for node in nm.getStorageList(): for node in self.nm.getStorageList():
if node.isRunning(): if node.isRunning():
node.setTemporarilyDown() node.setTemporarilyDown()
......
...@@ -17,14 +17,26 @@ ...@@ -17,14 +17,26 @@
import neo import neo
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import NodeTypes, NodeStates, Packets from neo.lib.protocol import (NodeTypes, NodeStates, Packets,
BrokenNodeDisallowedError,
)
from neo.lib.util import dump from neo.lib.util import dump
class MasterHandler(EventHandler): class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
def askPrimary(self, conn): def requestIdentification(self, conn, node_type, uuid, address, name):
self.checkClusterName(name)
app = self.app app = self.app
node = app.nm.getByUUID(uuid)
if node:
assert node_type is not NodeTypes.MASTER or node.getAddress() in (
address, None), (node, address)
if node.isBroken():
raise BrokenNodeDisallowedError
else:
node = app.nm.getByAddress(address)
peer_uuid = self._setupNode(conn, node_type, uuid, address, node)
if app.primary: if app.primary:
primary_uuid = app.uuid primary_uuid = app.uuid
elif app.primary_master_node is not None: elif app.primary_master_node is not None:
...@@ -32,12 +44,17 @@ class MasterHandler(EventHandler): ...@@ -32,12 +44,17 @@ class MasterHandler(EventHandler):
else: else:
primary_uuid = None primary_uuid = None
known_master_list = [(app.server, app.uuid, )] known_master_list = [(app.server, app.uuid)]
for n in app.nm.getMasterList(): for n in app.nm.getMasterList():
if n.isBroken(): if n.isBroken():
continue continue
known_master_list.append((n.getAddress(), n.getUUID(), )) known_master_list.append((n.getAddress(), n.getUUID()))
conn.answer(Packets.AnswerPrimary( conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
peer_uuid,
primary_uuid, primary_uuid,
known_master_list), known_master_list),
) )
......
...@@ -19,13 +19,37 @@ import neo.lib ...@@ -19,13 +19,37 @@ import neo.lib
from neo.lib.protocol import NodeTypes, NodeStates, Packets from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.lib.protocol import NotReadyError, ProtocolError, \ from neo.lib.protocol import NotReadyError, ProtocolError, \
UnexpectedPacketError UnexpectedPacketError
from neo.lib.protocol import BrokenNodeDisallowedError
from neo.lib.exception import ElectionFailure from neo.lib.exception import ElectionFailure
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.util import dump from neo.lib.util import dump
from . import MasterHandler from . import MasterHandler
class ClientElectionHandler(EventHandler): def elect(app, peer_uuid, peer_address):
if app.uuid < peer_uuid:
app.primary = False
app.negotiating_master_node_set.discard(peer_address)
class BaseElectionHandler(EventHandler):
def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested'
def announcePrimary(self, conn):
uuid = conn.getUUID()
if uuid is None:
raise ProtocolError('Not identified')
app = self.app
if app.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
node = app.nm.getByUUID(uuid)
app.primary = False
app.primary_master_node = node
app.negotiating_master_node_set.clear()
neo.lib.logging.info('%s is the primary', node)
class ClientElectionHandler(BaseElectionHandler):
def connectionFailed(self, conn): def connectionFailed(self, conn):
addr = conn.getAddress() addr = conn.getAddress()
...@@ -38,21 +62,30 @@ class ClientElectionHandler(EventHandler): ...@@ -38,21 +62,30 @@ class ClientElectionHandler(EventHandler):
super(ClientElectionHandler, self).connectionFailed(conn) super(ClientElectionHandler, self).connectionFailed(conn)
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
conn.ask(Packets.AskPrimary()) app = self.app
conn.ask(Packets.RequestIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
app.name,
))
super(ClientElectionHandler, self).connectionCompleted(conn) super(ClientElectionHandler, self).connectionCompleted(conn)
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
# Retry connection. Either the node just died (and we will end up in
# connectionFailed) or it just got elected (and we must not ignore
# that node).
addr = conn.getAddress() addr = conn.getAddress()
self.app.unconnected_master_node_set.add(addr)
self.app.negotiating_master_node_set.discard(addr) self.app.negotiating_master_node_set.discard(addr)
def acceptIdentification(self, conn, node_type, def acceptIdentification(self, conn, node_type, peer_uuid, num_partitions,
uuid, num_partitions, num_replicas, your_uuid): num_replicas, your_uuid, primary_uuid, known_master_list):
app = self.app app = self.app
node = app.nm.getByAddress(conn.getAddress())
if node_type != NodeTypes.MASTER: if node_type != NodeTypes.MASTER:
# The peer is not a master node! # The peer is not a master node!
neo.lib.logging.error('%r is not a master node', conn) neo.lib.logging.error('%r is not a master node', conn)
app.nm.remove(node) app.nm.remove(app.nm.getByAddress(conn.getAddress()))
conn.close() conn.close()
return return
...@@ -63,21 +96,13 @@ class ClientElectionHandler(EventHandler): ...@@ -63,21 +96,13 @@ class ClientElectionHandler(EventHandler):
dump(your_uuid)) dump(your_uuid))
raise ElectionFailure, 'new uuid supplied' raise ElectionFailure, 'new uuid supplied'
conn.setUUID(uuid) conn.setUUID(peer_uuid)
node.setUUID(uuid)
if app.uuid < uuid:
# I lost.
app.primary = False
app.negotiating_master_node_set.discard(conn.getAddress())
def answerPrimary(self, conn, primary_uuid, known_master_list):
app = self.app
# Register new master nodes. # Register new master nodes.
for address, uuid in known_master_list: for address, uuid in known_master_list:
if app.server == address: if app.server == address:
# This is self. # This is self.
assert peer_uuid != primary_uuid or uuid == your_uuid, (dump(uuid), dump(your_uuid))
continue continue
n = app.nm.getByAddress(address) n = app.nm.getByAddress(address)
if n is None: if n is None:
...@@ -108,84 +133,27 @@ class ClientElectionHandler(EventHandler): ...@@ -108,84 +133,27 @@ class ClientElectionHandler(EventHandler):
# Stop waiting for connections than primary master's to # Stop waiting for connections than primary master's to
# complete to exit election phase ASAP. # complete to exit election phase ASAP.
app.negotiating_master_node_set.clear() app.negotiating_master_node_set.clear()
return
primary_node = app.primary_master_node elect(app, peer_uuid, conn.getAddress())
if (primary_node is None or \
conn.getAddress() == primary_node.getAddress()) and \
not conn.isClosed():
# Request a node identification.
# There are 3 cases here:
# - Peer doesn't know primary node
# We must ask its identification so we exchange our uuids, to
# know which of us is secondary.
# - Peer knows primary node
# - He is the primary
# We must ask its identification, as part of the normal
# connection process
# - He is not the primary
# We don't need to ask its identification, as we will close
# this connection anyway (exiting election).
# Also, connection can be closed by peer after he sent
# AnswerPrimary if he finds the primary master before we
# give him our UUID.
# The connection gets closed before this message gets processed
# because this message might have been queued, but connection
# interruption takes effect as soon as received.
conn.ask(Packets.RequestIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
app.name
))
class ServerElectionHandler(MasterHandler):
def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested'
def requestIdentification(self, conn, node_type, class ServerElectionHandler(BaseElectionHandler, MasterHandler):
uuid, address, name):
self.checkClusterName(name) def _setupNode(self, conn, node_type, uuid, address, node):
app = self.app app = self.app
if node_type != NodeTypes.MASTER: if node_type != NodeTypes.MASTER:
neo.lib.logging.info('reject a connection from a non-master') neo.lib.logging.info('reject a connection from a non-master')
raise NotReadyError raise NotReadyError
node = app.nm.getByAddress(address)
if node is None: if node is None:
node = app.nm.createMaster(address=address) node = app.nm.createMaster(address=address)
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.isBroken():
raise BrokenNodeDisallowedError
# supplied another uuid in case of conflict # supplied another uuid in case of conflict
while not app.isValidUUID(uuid, address): while not app.isValidUUID(uuid, address):
uuid = app.getNewUUID(node_type) uuid = app.getNewUUID(node_type)
node.setUUID(uuid) node.setUUID(uuid)
conn.setUUID(uuid) conn.setUUID(uuid)
elect(app, uuid, address)
p = Packets.AcceptIdentification( return uuid
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
uuid
)
conn.answer(p)
def announcePrimary(self, conn):
uuid = conn.getUUID()
if uuid is None:
raise ProtocolError('Not identified')
app = self.app
if app.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
node = app.nm.getByUUID(uuid)
app.primary = False
app.primary_master_node = node
app.negotiating_master_node_set.clear()
neo.lib.logging.info('%s is the primary', node)
...@@ -17,23 +17,20 @@ ...@@ -17,23 +17,20 @@
import neo import neo
from neo.lib.protocol import NodeTypes, Packets from neo.lib.protocol import NodeTypes, Packets
from neo.lib.protocol import BrokenNodeDisallowedError, ProtocolError from neo.lib.protocol import ProtocolError
from . import MasterHandler from . import MasterHandler
class IdentificationHandler(MasterHandler): class IdentificationHandler(MasterHandler):
def requestIdentification(self, conn, node_type, uuid, address, name): def requestIdentification(self, conn, *args, **kw):
super(IdentificationHandler, self).requestIdentification(conn, *args,
**kw)
handler = conn.getHandler()
assert not isinstance(handler, IdentificationHandler), handler
handler.connectionCompleted(conn)
self.checkClusterName(name) def _setupNode(self, conn, node_type, uuid, address, node):
app = self.app app = self.app
# handle conflicts and broken nodes
node = app.nm.getByUUID(uuid)
if node:
if node.isBroken():
raise BrokenNodeDisallowedError
else:
node = app.nm.getByAddress(address)
if node: if node:
if node.isRunning(): if node.isRunning():
# cloned/evil/buggy node connecting to us # cloned/evil/buggy node connecting to us
...@@ -56,13 +53,18 @@ class IdentificationHandler(MasterHandler): ...@@ -56,13 +53,18 @@ class IdentificationHandler(MasterHandler):
node.setState(state) node.setState(state)
node.setConnection(conn) node.setConnection(conn)
conn.setHandler(handler) conn.setHandler(handler)
conn.answer(Packets.AcceptIdentification(NodeTypes.MASTER, app.uuid,
app.pt.getPartitions(), app.pt.getReplicas(), uuid))
handler.connectionCompleted(conn)
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
return uuid
class SecondaryIdentificationHandler(MasterHandler): class SecondaryIdentificationHandler(MasterHandler):
def announcePrimary(self, conn):
# If we received AnnouncePrimary on a client connection, we might have
# set this handler on server connection, and might receive
# AnnouncePrimary there too. As we cannot reach this without already
# handling a first AnnouncePrimary, we can safely ignore this one.
pass
def _setupNode(self, conn, node_type, uuid, address, node): def _setupNode(self, conn, node_type, uuid, address, node):
# Nothing to do, storage will disconnect when it receives our answer. # Nothing to do, storage will disconnect when it receives our answer.
# Primary will do the checks. # Primary will do the checks.
......
...@@ -46,11 +46,17 @@ class PrimaryHandler(EventHandler): ...@@ -46,11 +46,17 @@ class PrimaryHandler(EventHandler):
raise PrimaryFailure, 'primary master is dead' raise PrimaryFailure, 'primary master is dead'
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app
addr = conn.getAddress() addr = conn.getAddress()
node = self.app.nm.getByAddress(addr) node = app.nm.getByAddress(addr)
# connection successfull, set it as running # connection successfull, set it as running
node.setRunning() node.setRunning()
conn.ask(Packets.AskPrimary()) conn.ask(Packets.RequestIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
app.name,
))
super(PrimaryHandler, self).connectionCompleted(conn) super(PrimaryHandler, self).connectionCompleted(conn)
def reelectPrimary(self, conn): def reelectPrimary(self, conn):
...@@ -78,10 +84,11 @@ class PrimaryHandler(EventHandler): ...@@ -78,10 +84,11 @@ class PrimaryHandler(EventHandler):
if n.getUUID() is None: if n.getUUID() is None:
n.setUUID(uuid) n.setUUID(uuid)
def acceptIdentification(self, conn, node_type, def acceptIdentification(self, conn, node_type, uuid, num_partitions,
uuid, num_partitions, num_replicas, your_uuid, primary_uuid, known_master_list):
num_replicas, your_uuid):
app = self.app app = self.app
if primary_uuid != app.primary_master_node.getUUID():
raise PrimaryFailure('unexpected primary uuid')
node = app.nm.getByAddress(conn.getAddress()) node = app.nm.getByAddress(conn.getAddress())
assert node_type == NodeTypes.MASTER assert node_type == NodeTypes.MASTER
...@@ -92,14 +99,3 @@ class PrimaryHandler(EventHandler): ...@@ -92,14 +99,3 @@ class PrimaryHandler(EventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
node.setUUID(uuid) node.setUUID(uuid)
def answerPrimary(self, conn, primary_uuid, known_master_list):
app = self.app
if primary_uuid != app.primary_master_node.getUUID():
raise PrimaryFailure, 'unexpected primary uuid'
conn.ask(Packets.RequestIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
app.name
))
...@@ -25,5 +25,4 @@ class ShutdownHandler(BaseServiceHandler): ...@@ -25,5 +25,4 @@ class ShutdownHandler(BaseServiceHandler):
"""This class deals with events for a shutting down phase.""" """This class deals with events for a shutting down phase."""
requestIdentification = reject requestIdentification = reject
askPrimary = reject
askBeginTransaction = reject askBeginTransaction = reject
...@@ -72,5 +72,6 @@ class IdentificationHandler(EventHandler): ...@@ -72,5 +72,6 @@ class IdentificationHandler(EventHandler):
node.setConnection(conn, app.uuid < uuid) node.setConnection(conn, app.uuid < uuid)
# accept the identification and trigger an event # accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid)) app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid,
app.master_node.getUUID(), ()))
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
...@@ -779,8 +779,6 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -779,8 +779,6 @@ class ClientApplicationTests(NeoUnitTestBase):
def _ask6(conn): def _ask6(conn):
app.master_conn = conn app.master_conn = conn
app.uuid = 'C' * 16 app.uuid = 'C' * 16
# fourth iteration : connection to primary master succeeded
def _ask5(_):
app.trying_master_node = app.primary_master_node = Mock({ app.trying_master_node = app.primary_master_node = Mock({
'getAddress': ('127.0.0.1', 10011), 'getAddress': ('127.0.0.1', 10011),
'__str__': 'Fake master node', '__str__': 'Fake master node',
...@@ -803,7 +801,7 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -803,7 +801,7 @@ class ClientApplicationTests(NeoUnitTestBase):
# telling us what its address is.) # telling us what its address is.)
def _ask1(_): def _ask1(_):
pass pass
ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask5, _ask6, _ask7, ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask6, _ask7,
_ask8] _ask8]
def _ask_base(conn, _, handler=None): def _ask_base(conn, _, handler=None):
ask_func_list.pop(0)(conn) ask_func_list.pop(0)(conn)
......
...@@ -55,15 +55,19 @@ class MasterBootstrapHandlerTests(MasterHandlerTests): ...@@ -55,15 +55,19 @@ class MasterBootstrapHandlerTests(MasterHandlerTests):
conn = self.getConnection() conn = self.getConnection()
uuid = self.getNewUUID() uuid = self.getNewUUID()
self.handler.acceptIdentification(conn, NodeTypes.CLIENT, self.handler.acceptIdentification(conn, NodeTypes.CLIENT,
uuid, 100, 0, None) uuid, 100, 0, None, None, [])
self.checkClosed(conn) self.checkClosed(conn)
def test_acceptIdentification2(self): def test_acceptIdentification2(self):
""" No UUID supplied """ """ No UUID supplied """
conn = self.getConnection() conn = self.getConnection()
uuid = self.getNewUUID() uuid = self.getNewUUID()
node = Mock()
self.app.nm = Mock({'getByAddress': node, 'getByUUID': node})
self.checkProtocolErrorRaised(self.handler.acceptIdentification, self.checkProtocolErrorRaised(self.handler.acceptIdentification,
conn, NodeTypes.MASTER, uuid, 100, 0, None) conn, NodeTypes.MASTER, uuid, 100, 0, None,
uuid, [(conn.getAddress(), uuid)],
)
def test_acceptIdentification3(self): def test_acceptIdentification3(self):
""" identification accepted """ """ identification accepted """
...@@ -73,9 +77,9 @@ class MasterBootstrapHandlerTests(MasterHandlerTests): ...@@ -73,9 +77,9 @@ class MasterBootstrapHandlerTests(MasterHandlerTests):
your_uuid = self.getNewUUID() your_uuid = self.getNewUUID()
partitions = 100 partitions = 100
replicas = 2 replicas = 2
self.app.nm = Mock({'getByAddress': node}) self.app.nm = Mock({'getByAddress': node, 'getByUUID': node})
self.handler.acceptIdentification(conn, NodeTypes.MASTER, uuid, self.handler.acceptIdentification(conn, NodeTypes.MASTER, uuid,
partitions, replicas, your_uuid) partitions, replicas, your_uuid, uuid, [(conn.getAddress(), uuid)])
self.assertEqual(self.app.uuid, your_uuid) self.assertEqual(self.app.uuid, your_uuid)
self.checkUUIDSet(node, uuid) self.checkUUIDSet(node, uuid)
self.assertTrue(isinstance(self.app.pt, PartitionTable)) self.assertTrue(isinstance(self.app.pt, PartitionTable))
...@@ -88,38 +92,6 @@ class MasterBootstrapHandlerTests(MasterHandlerTests): ...@@ -88,38 +92,6 @@ class MasterBootstrapHandlerTests(MasterHandlerTests):
port += 1 port += 1
return master_list return master_list
def test_answerPrimary1(self):
""" Primary not known, master udpated """
node, uuid = Mock(), self.getNewUUID()
conn = self.getConnection()
master_list = [(('127.0.0.1', 1000), uuid)]
self.app.primary_master_node = Mock()
self.app.trying_master_node = Mock()
self.app.nm = Mock({'getByAddress': node})
self.handler.answerPrimary(conn, None, master_list)
self.checkUUIDSet(node, uuid)
# previously known primary master forgoten
self.assertEqual(self.app.primary_master_node, None)
self.assertEqual(self.app.trying_master_node, None)
self.checkClosed(conn)
def test_answerPrimary2(self):
""" Primary known """
current_node = Mock({'__repr__': '1'})
node, uuid = Mock({'__repr__': '2'}), self.getNewUUID()
conn = self.getConnection()
master_list = [(('127.0.0.1', 1000), uuid)]
self.app.primary_master_node = None
self.app.trying_master_node = current_node
self.app.nm = Mock({
'getByAddress': node,
'getByUUID': node,
})
self.handler.answerPrimary(conn, uuid, [])
self.assertEqual(self.app.trying_master_node, None)
self.assertTrue(self.app.primary_master_node is node)
self.checkClosed(conn)
def test_answerPartitionTable(self): def test_answerPartitionTable(self):
conn = self.getConnection() conn = self.getConnection()
self.app.pt = Mock() self.app.pt = Mock()
......
...@@ -45,21 +45,23 @@ class StorageBootstrapHandlerTests(NeoUnitTestBase): ...@@ -45,21 +45,23 @@ class StorageBootstrapHandlerTests(NeoUnitTestBase):
def test_acceptIdentification1(self): def test_acceptIdentification1(self):
""" Not a storage node """ """ Not a storage node """
uuid = self.getNewUUID() uuid = self.getNewUUID()
node_uuid = self.getNewUUID()
conn = self.getConnection() conn = self.getConnection()
conn = self.getConnection() self.app.primary_master_node = node = Mock({'getUUID': node_uuid})
node = Mock()
self.app.nm = Mock({'getByAddress': node}) self.app.nm = Mock({'getByAddress': node})
self.handler.acceptIdentification(conn, NodeTypes.CLIENT, uuid, self.handler.acceptIdentification(conn, NodeTypes.CLIENT, uuid,
10, 0, None) 10, 0, None, node_uuid, [])
self.checkClosed(conn) self.checkClosed(conn)
def test_acceptIdentification2(self): def test_acceptIdentification2(self):
uuid = self.getNewUUID() uuid = self.getNewUUID()
node_uuid = self.getNewUUID()
conn = self.getConnection() conn = self.getConnection()
node = Mock({'getConnection': conn}) self.app.primary_master_node = node = Mock({'getConnection': conn,
'getUUID': node_uuid})
self.app.nm = Mock({'getByAddress': node}) self.app.nm = Mock({'getByAddress': node})
self.handler.acceptIdentification(conn, NodeTypes.STORAGE, uuid, self.handler.acceptIdentification(conn, NodeTypes.STORAGE, uuid,
10, 0, None) 10, 0, None, node_uuid, [])
self.checkUUIDSet(node, uuid) self.checkUUIDSet(node, uuid)
......
This diff is collapsed.
...@@ -82,6 +82,10 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase): ...@@ -82,6 +82,10 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase):
uuid = self.getNewUUID() uuid = self.getNewUUID()
conn = self.getFakeConnection(uuid=uuid) conn = self.getFakeConnection(uuid=uuid)
node = self.app.nm.createClient(uuid=uuid) node = self.app.nm.createClient(uuid=uuid)
master_uuid = self.getNewUUID()
self.app.master_node = Mock({
'getUUID': master_uuid,
})
self.identification.requestIdentification(conn, NodeTypes.CLIENT, uuid, self.identification.requestIdentification(conn, NodeTypes.CLIENT, uuid,
None, self.app.name) None, self.app.name)
self.assertTrue(node.isRunning()) self.assertTrue(node.isRunning())
...@@ -89,11 +93,12 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase): ...@@ -89,11 +93,12 @@ class StorageIdentificationHandlerTests(NeoUnitTestBase):
self.assertEqual(node.getUUID(), uuid) self.assertEqual(node.getUUID(), uuid)
self.assertTrue(node.getConnection() is conn) self.assertTrue(node.getConnection() is conn)
args = self.checkAcceptIdentification(conn, decode=True) args = self.checkAcceptIdentification(conn, decode=True)
node_type, address, _np, _nr, _uuid = args node_type, address, _np, _nr, _uuid, _master_uuid, _master_list = args
self.assertEqual(node_type, NodeTypes.STORAGE) self.assertEqual(node_type, NodeTypes.STORAGE)
self.assertEqual(address, None) self.assertEqual(address, None)
self.assertEqual(_uuid, uuid) self.assertEqual(_uuid, uuid)
self.assertEqual(_master_uuid, master_uuid)
# TODO: check _master_list ?
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -51,7 +51,7 @@ class BootstrapManagerTests(NeoUnitTestBase): ...@@ -51,7 +51,7 @@ class BootstrapManagerTests(NeoUnitTestBase):
conn = self.getFakeConnection(address=address) conn = self.getFakeConnection(address=address)
self.bootstrap.current = self.app.nm.createMaster(address=address) self.bootstrap.current = self.app.nm.createMaster(address=address)
self.bootstrap.connectionCompleted(conn) self.bootstrap.connectionCompleted(conn)
self.checkAskPrimary(conn) self.checkRequestIdentification(conn)
def testHandleNotReady(self): def testHandleNotReady(self):
# the primary is not ready # the primary is not ready
......
...@@ -424,17 +424,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -424,17 +424,7 @@ class ConnectionTests(NeoUnitTestBase):
self._checkPacketReceived(0) self._checkPacketReceived(0)
self._checkReadBuf(bc, '') self._checkReadBuf(bc, '')
# give some data to analyse p = Packets.AnswerPrimary(self.getNewUUID())
master_list = (
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2235), self.getNewUUID()),
(("127.0.0.1", 2134), self.getNewUUID()),
(("127.0.0.1", 2335), self.getNewUUID()),
(("127.0.0.1", 2133), self.getNewUUID()),
(("127.0.0.1", 2435), self.getNewUUID()),
(("127.0.0.1", 2132), self.getNewUUID()))
p = Packets.AnswerPrimary(self.getNewUUID(), master_list)
p.setId(1) p.setId(1)
p_data = ''.join(p.encode()) p_data = ''.join(p.encode())
data_edge = len(p_data) - 1 data_edge = len(p_data) - 1
...@@ -461,30 +451,10 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -461,30 +451,10 @@ class ConnectionTests(NeoUnitTestBase):
# give multiple packet # give multiple packet
bc = self._makeConnection() bc = self._makeConnection()
bc._queue = Mock() bc._queue = Mock()
# packet 1 p1 = Packets.AnswerPrimary(self.getNewUUID())
master_list = (
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2235), self.getNewUUID()),
(("127.0.0.1", 2134), self.getNewUUID()),
(("127.0.0.1", 2335), self.getNewUUID()),
(("127.0.0.1", 2133), self.getNewUUID()),
(("127.0.0.1", 2435), self.getNewUUID()),
(("127.0.0.1", 2132), self.getNewUUID()))
p1 = Packets.AnswerPrimary(self.getNewUUID(), master_list)
p1.setId(1) p1.setId(1)
self._appendPacketToReadBuf(bc, p1) self._appendPacketToReadBuf(bc, p1)
# packet 2 p2 = Packets.AnswerPrimary( self.getNewUUID())
master_list = (
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2235), self.getNewUUID()),
(("127.0.0.1", 2134), self.getNewUUID()),
(("127.0.0.1", 2335), self.getNewUUID()),
(("127.0.0.1", 2133), self.getNewUUID()),
(("127.0.0.1", 2435), self.getNewUUID()),
(("127.0.0.1", 2132), self.getNewUUID()))
p2 = Packets.AnswerPrimary( self.getNewUUID(), master_list)
p2.setId(2) p2.setId(2)
self._appendPacketToReadBuf(bc, p2) self._appendPacketToReadBuf(bc, p2)
self.assertEqual(len(bc.read_buf), len(p1) + len(p2)) self.assertEqual(len(bc.read_buf), len(p1) + len(p2))
...@@ -519,16 +489,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -519,16 +489,7 @@ class ConnectionTests(NeoUnitTestBase):
# give an expected packet # give an expected packet
bc = self._makeConnection() bc = self._makeConnection()
bc._queue = Mock() bc._queue = Mock()
master_list = ( p = Packets.AnswerPrimary(self.getNewUUID())
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2135), self.getNewUUID()),
(("127.0.0.1", 2235), self.getNewUUID()),
(("127.0.0.1", 2134), self.getNewUUID()),
(("127.0.0.1", 2335), self.getNewUUID()),
(("127.0.0.1", 2133), self.getNewUUID()),
(("127.0.0.1", 2435), self.getNewUUID()),
(("127.0.0.1", 2132), self.getNewUUID()))
p = Packets.AnswerPrimary(self.getNewUUID(), master_list)
p.setId(1) p.setId(1)
self._appendPacketToReadBuf(bc, p) self._appendPacketToReadBuf(bc, p)
bc.analyse() bc.analyse()
...@@ -627,16 +588,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -627,16 +588,7 @@ class ConnectionTests(NeoUnitTestBase):
# With aborted set to false # With aborted set to false
# patch receive method to return data # patch receive method to return data
def receive(self): def receive(self):
master_list = ((("127.0.0.1", 2135), self.getNewUUID()), p = Packets.AnswerPrimary(self.getNewUUID())
(("127.0.0.1", 2136), self.getNewUUID()),
(("127.0.0.1", 2235), self.getNewUUID()),
(("127.0.0.1", 2134), self.getNewUUID()),
(("127.0.0.1", 2335), self.getNewUUID()),
(("127.0.0.1", 2133), self.getNewUUID()),
(("127.0.0.1", 2435), self.getNewUUID()),
(("127.0.0.1", 2132), self.getNewUUID()))
uuid = self.getNewUUID()
p = Packets.AnswerPrimary(uuid, master_list)
p.setId(1) p.setId(1)
return ''.join(p.encode()) return ''.join(p.encode())
DoNothingConnector.receive = receive DoNothingConnector.receive = receive
......
...@@ -91,16 +91,33 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -91,16 +91,33 @@ class ProtocolTests(NeoUnitTestBase):
self.assertEqual(port, 9080) self.assertEqual(port, 9080)
self.assertEqual(name, "unittest") self.assertEqual(name, "unittest")
def test_12_AcceptIdentification(self): def _testAcceptIdentification(self, master_list):
uuid1, uuid2 = self.getNewUUID(), self.getNewUUID() uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
uuid3 = self.getNewUUID()
p = Packets.AcceptIdentification(NodeTypes.CLIENT, uuid1, p = Packets.AcceptIdentification(NodeTypes.CLIENT, uuid1,
10, 20, uuid2) 10, 20, uuid2, uuid3, master_list)
node, p_uuid, nb_partitions, nb_replicas, your_uuid = p.decode() (node, p_uuid, nb_partitions, nb_replicas, your_uuid, primary_uuid,
pmaster_list) = p.decode()
self.assertEqual(node, NodeTypes.CLIENT) self.assertEqual(node, NodeTypes.CLIENT)
self.assertEqual(p_uuid, uuid1) self.assertEqual(p_uuid, uuid1)
self.assertEqual(nb_partitions, 10) self.assertEqual(nb_partitions, 10)
self.assertEqual(nb_replicas, 20) self.assertEqual(nb_replicas, 20)
self.assertEqual(your_uuid, uuid2) self.assertEqual(your_uuid, uuid2)
self.assertEqual(primary_uuid, uuid3)
self.assertEqual(pmaster_list, master_list)
def test_12_AcceptIdentification(self):
self._testAcceptIdentification([
(("0.0.0.0", 1), None),
(("255.255.255.255", 2), self.getNewUUID()),
])
def test_12_AcceptIdentificationIPv6(self):
self._testAcceptIdentification([
(("::", 1), None),
(("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", 2),
self.getNewUUID()),
])
def test_13_askPrimary(self): def test_13_askPrimary(self):
p = Packets.AskPrimary() p = Packets.AskPrimary()
...@@ -111,28 +128,9 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -111,28 +128,9 @@ class ProtocolTests(NeoUnitTestBase):
uuid1 = self.getNewUUID() uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID() uuid2 = self.getNewUUID()
uuid3 = self.getNewUUID() uuid3 = self.getNewUUID()
master_list = [(("127.0.0.1", 1), uuid1), p = Packets.AnswerPrimary(uuid)
(("127.0.0.2", 2), uuid2), primary_uuid, = p.decode()
(("127.0.0.3", 3), uuid3)]
p = Packets.AnswerPrimary(uuid, master_list)
primary_uuid, p_master_list = p.decode()
self.assertEqual(primary_uuid, uuid)
self.assertEqual(master_list, p_master_list)
def test_14_bis_answerPrimaryIPv6(self):
""" Try to get primary master through IPv6 """
self.address_type = socket.AF_INET6
uuid = self.getNewUUID()
uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID()
uuid3 = self.getNewUUID()
master_list = [(("::1", 1), uuid1),
(("::2", 2), uuid2),
(("::3", 3), uuid3)]
p = Packets.AnswerPrimary(uuid, master_list)
primary_uuid, p_master_list = p.decode()
self.assertEqual(primary_uuid, uuid) self.assertEqual(primary_uuid, uuid)
self.assertEqual(master_list, p_master_list)
def test_15_announcePrimary(self): def test_15_announcePrimary(self):
p = Packets.AnnouncePrimary() p = Packets.AnnouncePrimary()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment