Commit f051b7a0 authored by Julien Muchembled's avatar Julien Muchembled

On NM update, fix removal of nodes that aren't part of the cluster anymore

In order to do that correctly, this commit contains several other changes:

When connecting to a primary master, a full node list always follows the
identification. For storage nodes, this means that they now know all nodes
during the RECOVERING phase.

The initial full node list now always contains a node tuple for:
- the server-side node (i.e. the primary master): on a master, this is
  done by always having a node describing itself in its node manager.
- the client-side node, to make sure it gets a id timestamp:
  now an admin node also receives a node for itself.
parent 9e54a8e0
...@@ -90,9 +90,7 @@ class ConfigurationManager(object): ...@@ -90,9 +90,7 @@ class ConfigurationManager(object):
def getMasters(self): def getMasters(self):
""" Get the master node list except itself """ """ Get the master node list except itself """
masters = self.__get('masters') return util.parseMasterList(self.__get('masters'))
# load master node list except itself
return util.parseMasterList(masters, except_node=self.getBind())
def getBind(self): def getBind(self):
""" Get the address to bind to """ """ Get the address to bind to """
......
...@@ -414,7 +414,7 @@ class NodeManager(EventQueue): ...@@ -414,7 +414,7 @@ class NodeManager(EventQueue):
def update(self, app, timestamp, node_list): def update(self, app, timestamp, node_list):
assert self._timestamp < timestamp, (self._timestamp, timestamp) assert self._timestamp < timestamp, (self._timestamp, timestamp)
self._timestamp = timestamp self._timestamp = timestamp
node_set = self._node_set.copy() if app.id_timestamp is None else None added_list = [] if app.id_timestamp is None else None
for node_type, addr, uuid, state, id_timestamp in node_list: for node_type, addr, uuid, state, id_timestamp in node_list:
# This should be done here (although klass might not be used in this # This should be done here (although klass might not be used in this
# iteration), as it raises if type is not valid. # iteration), as it raises if type is not valid.
...@@ -427,9 +427,7 @@ class NodeManager(EventQueue): ...@@ -427,9 +427,7 @@ class NodeManager(EventQueue):
log_args = node_type, uuid_str(uuid), addr, state, id_timestamp log_args = node_type, uuid_str(uuid), addr, state, id_timestamp
if node is None: if node is None:
if state == NodeStates.DOWN: assert state != NodeStates.DOWN, (self._node_set,) + log_args
logging.debug('NOT creating node %s %s %s %s %s', *log_args)
continue
node = self._createNode(klass, address=addr, uuid=uuid, node = self._createNode(klass, address=addr, uuid=uuid,
state=state) state=state)
logging.debug('creating node %r', node) logging.debug('creating node %r', node)
...@@ -451,8 +449,9 @@ class NodeManager(EventQueue): ...@@ -451,8 +449,9 @@ class NodeManager(EventQueue):
# reconnect to the master because they cleared their # reconnect to the master because they cleared their
# partition table upon disconnection. # partition table upon disconnection.
node.getConnection().close() node.getConnection().close()
if app.uuid != uuid: if app.uuid != uuid: # XXX
app.pt.dropNode(node) dropped = app.pt.dropNode(node)
assert dropped, node
self.remove(node) self.remove(node)
continue continue
logging.debug('updating node %r to %s %s %s %s %s', logging.debug('updating node %r to %s %s %s %s %s',
...@@ -463,12 +462,15 @@ class NodeManager(EventQueue): ...@@ -463,12 +462,15 @@ class NodeManager(EventQueue):
node.id_timestamp = id_timestamp node.id_timestamp = id_timestamp
if app.uuid == uuid: if app.uuid == uuid:
app.id_timestamp = id_timestamp app.id_timestamp = id_timestamp
if node_set: if added_list is not None:
added_list.append(node)
if added_list is not None:
assert app.id_timestamp is not None
# For the first notification, we receive a full list of nodes from # For the first notification, we receive a full list of nodes from
# the master. Remove all unknown nodes from a previous connection. # the master. Remove all unknown nodes from a previous connection.
for node in node_set - self._node_set: for node in self._node_set.difference(added_list):
app.pt.dropNode(node) if app.pt.dropNode(node):
self.remove(node) self.remove(node)
self.log() self.log()
self.executeQueuedEvents() self.executeQueuedEvents()
......
...@@ -196,8 +196,10 @@ class PartitionTable(object): ...@@ -196,8 +196,10 @@ class PartitionTable(object):
break break
def dropNode(self, node): def dropNode(self, node):
count = self.count_dict.pop(node, None) count = self.count_dict.get(node)
assert not count, (node, count) if count == 0:
del self.count_dict[node]
return not count
def load(self, ptid, row_list, nm): def load(self, ptid, row_list, nm):
""" """
......
...@@ -148,14 +148,9 @@ def parseNodeAddress(address, port_opt=None): ...@@ -148,14 +148,9 @@ def parseNodeAddress(address, port_opt=None):
# or return either raw host & port or getaddrinfo return value. # or return either raw host & port or getaddrinfo return value.
return socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][4][:2] return socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)[0][4][:2]
def parseMasterList(masters, except_node=None): def parseMasterList(masters):
assert masters, 'At least one master must be defined' assert masters, 'At least one master must be defined'
master_node_list = [] return map(parseNodeAddress, masters.split())
for node in masters.split():
address = parseNodeAddress(node)
if address != except_node:
master_node_list.append(address)
return master_node_list
class ReadBuffer(object): class ReadBuffer(object):
......
...@@ -57,6 +57,9 @@ class Application(BaseApplication): ...@@ -57,6 +57,9 @@ class Application(BaseApplication):
backup_tid = None backup_tid = None
backup_app = None backup_app = None
truncate_tid = None truncate_tid = None
uuid = property(
lambda self: self._node.getUUID(),
lambda self, uuid: self._node.setUUID(uuid))
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__( super(Application, self).__init__(
...@@ -71,6 +74,8 @@ class Application(BaseApplication): ...@@ -71,6 +74,8 @@ class Application(BaseApplication):
self.storage_starting_set = set() self.storage_starting_set = set()
for master_address in config.getMasters(): for master_address in config.getMasters():
self.nm.createMaster(address=master_address) self.nm.createMaster(address=master_address)
self._node = self.nm.createMaster(address=self.server,
uuid=config.getUUID())
logging.debug('IP address is %s, port is %d', *self.server) logging.debug('IP address is %s, port is %d', *self.server)
...@@ -91,8 +96,6 @@ class Application(BaseApplication): ...@@ -91,8 +96,6 @@ class Application(BaseApplication):
self.primary_master_node = None self.primary_master_node = None
self.cluster_state = None self.cluster_state = None
self.uuid = config.getUUID()
# election related data # election related data
self.unconnected_master_node_set = set() self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set() self.negotiating_master_node_set = set()
...@@ -185,6 +188,8 @@ class Application(BaseApplication): ...@@ -185,6 +188,8 @@ class Application(BaseApplication):
# handle new connected masters # handle new connected masters
for node in self.nm.getMasterList(): for node in self.nm.getMasterList():
if node is self._node:
continue
node.setUnknown() node.setUnknown()
self.unconnected_master_node_set.add(node.getAddress()) self.unconnected_master_node_set.add(node.getAddress())
...@@ -232,11 +237,7 @@ class Application(BaseApplication): ...@@ -232,11 +237,7 @@ class Application(BaseApplication):
self.primary = self.primary is None self.primary = self.primary is None
break break
def broadcastNodesInformation(self, node_list, exclude=None): def getNodeInformationDict(self, node_list):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
"""
node_dict = defaultdict(list) node_dict = defaultdict(list)
# group modified nodes by destination node type # group modified nodes by destination node type
for node in node_list: for node in node_list:
...@@ -251,7 +252,14 @@ class Application(BaseApplication): ...@@ -251,7 +252,14 @@ class Application(BaseApplication):
if node.isStorage(): if node.isStorage():
continue continue
node_dict[NodeTypes.MASTER].append(node_info) node_dict[NodeTypes.MASTER].append(node_info)
return node_dict
def broadcastNodesInformation(self, node_list, exclude=None):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
"""
node_dict = self.getNodeInformationDict(node_list)
now = monotonic_time() now = monotonic_time()
# send at most one non-empty notification packet per node # send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
...@@ -340,12 +348,7 @@ class Application(BaseApplication): ...@@ -340,12 +348,7 @@ class Application(BaseApplication):
if self.uuid is None: if self.uuid is None:
self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER) self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER)
logging.info('My UUID: ' + uuid_str(self.uuid)) logging.info('My UUID: ' + uuid_str(self.uuid))
else: self._node.setRunning()
in_conflict = self.nm.getByUUID(self.uuid)
if in_conflict is not None:
logging.warning('UUID conflict at election exit with %r',
in_conflict)
in_conflict.setUUID(None)
# Do not restart automatically if ElectionFailure is raised, in order # Do not restart automatically if ElectionFailure is raised, in order
# to avoid a split of the database. For example, with 2 machines with # to avoid a split of the database. For example, with 2 machines with
......
...@@ -28,8 +28,6 @@ class MasterHandler(EventHandler): ...@@ -28,8 +28,6 @@ class MasterHandler(EventHandler):
def connectionCompleted(self, conn, new=None): def connectionCompleted(self, conn, new=None):
if new is None: if new is None:
super(MasterHandler, self).connectionCompleted(conn) super(MasterHandler, self).connectionCompleted(conn)
elif new:
self._notifyNodeInformation(conn)
def requestIdentification(self, conn, node_type, uuid, address, name, _): def requestIdentification(self, conn, node_type, uuid, address, name, _):
self.checkClusterName(name) self.checkClusterName(name)
...@@ -49,7 +47,7 @@ class MasterHandler(EventHandler): ...@@ -49,7 +47,7 @@ class MasterHandler(EventHandler):
else: else:
primary_address = None primary_address = None
known_master_list = [(app.server, app.uuid)] known_master_list = []
for n in app.nm.getMasterList(): for n in app.nm.getMasterList():
if n.isBroken(): if n.isBroken():
continue continue
...@@ -84,11 +82,12 @@ class MasterHandler(EventHandler): ...@@ -84,11 +82,12 @@ class MasterHandler(EventHandler):
self.app.getLastTransaction())) self.app.getLastTransaction()))
def _notifyNodeInformation(self, conn): def _notifyNodeInformation(self, conn):
nm = self.app.nm app = self.app
node_list = [] node = app.nm.getByUUID(conn.getUUID())
node_list.extend(n.asTuple() for n in nm.getMasterList()) node_list = app.nm.getList()
node_list.extend(n.asTuple() for n in nm.getClientList()) node_list.remove(node)
node_list.extend(n.asTuple() for n in nm.getStorageList()) node_list = ([node.asTuple()] # for id_timestamp
+ app.getNodeInformationDict(node_list)[node.getType()])
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list)) conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
...@@ -104,7 +103,6 @@ class BaseServiceHandler(MasterHandler): ...@@ -104,7 +103,6 @@ class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase.""" """This class deals with events for a service phase."""
def connectionCompleted(self, conn, new): def connectionCompleted(self, conn, new):
self._notifyNodeInformation(conn)
pt = self.app.pt pt = self.app.pt
conn.send(Packets.SendPartitionTable(pt.getID(), pt.getRowList())) conn.send(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
......
...@@ -33,13 +33,6 @@ class ClientServiceHandler(MasterHandler): ...@@ -33,13 +33,6 @@ class ClientServiceHandler(MasterHandler):
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
app.nm.remove(node) app.nm.remove(node)
def _notifyNodeInformation(self, conn):
nm = self.app.nm
node_list = [nm.getByUUID(conn.getUUID()).asTuple()] # for id_timestamp
node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def askBeginTransaction(self, conn, tid): def askBeginTransaction(self, conn, tid):
""" """
A client request a TID, nothing is kept about it until the finish. A client request a TID, nothing is kept about it until the finish.
......
...@@ -27,6 +27,7 @@ class IdentificationHandler(MasterHandler): ...@@ -27,6 +27,7 @@ class IdentificationHandler(MasterHandler):
**kw) **kw)
handler = conn.getHandler() handler = conn.getHandler()
assert not isinstance(handler, IdentificationHandler), handler assert not isinstance(handler, IdentificationHandler), handler
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True) handler.connectionCompleted(conn, True)
def _setupNode(self, conn, node_type, uuid, address, node): def _setupNode(self, conn, node_type, uuid, address, node):
......
...@@ -38,10 +38,6 @@ class SecondaryMasterHandler(MasterHandler): ...@@ -38,10 +38,6 @@ class SecondaryMasterHandler(MasterHandler):
def reelectPrimary(self, conn): def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
def _notifyNodeInformation(self, conn):
node_list = [n.asTuple() for n in self.app.nm.getMasterList()]
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
class PrimaryHandler(EventHandler): class PrimaryHandler(EventHandler):
""" Handler used by secondaries to handle primary master""" """ Handler used by secondaries to handle primary master"""
......
...@@ -164,7 +164,7 @@ class NodeManagerTests(NeoUnitTestBase): ...@@ -164,7 +164,7 @@ class NodeManagerTests(NeoUnitTestBase):
NodeStates.UNKNOWN, None), NodeStates.UNKNOWN, None),
) )
app = Mock() app = Mock()
app.pt = Mock() app.pt = Mock({'dropNode': True})
# update manager content # update manager content
manager.update(app, time(), node_list) manager.update(app, time(), node_list)
# - the client gets down # - the client gets down
......
...@@ -336,7 +336,7 @@ class ServerNode(Node): ...@@ -336,7 +336,7 @@ class ServerNode(Node):
self.daemon = True self.daemon = True
self.node_name = '%s_%u' % (self.node_type, port) self.node_name = '%s_%u' % (self.node_type, port)
kw.update(getCluster=name, getBind=address, kw.update(getCluster=name, getBind=address,
getMasters=master_nodes and parseMasterList(master_nodes, address)) getMasters=master_nodes and parseMasterList(master_nodes))
super(ServerNode, self).__init__(Mock(kw)) super(ServerNode, self).__init__(Mock(kw))
def getVirtualAddress(self): def getVirtualAddress(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment