Commit 23b6a66a authored by Julien Muchembled's avatar Julien Muchembled

Reimplement election (of the primary master)

The election is not a separate process anymore.
It happens during the RECOVERING phase, and there's no use of timeouts anymore.

Each master node keeps a timestamp of when it started to play the primary role,
and the node with the smallest timestamp is elected. The election stops when
the cluster is started: as long as it is operational, the primary master can't
be deposed.

An election must happen whenever the cluster is not operational anymore, to
handle the case of a network cut between a primary master and all other nodes:
then another master node (secondary) takes over and when the initial primary
master is back, it loses against the new primary master if the cluster is
already started.
parent 0a3dba8b
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
This is mainly the case for : This is mainly the case for :
- Client rejected before the cluster is operational - Client rejected before the cluster is operational
- Empty storages rejected during recovery process - Empty storages rejected during recovery process
Masters implies in the election process should still reject any connection
as the primary master is still unknown.
- Implement transaction garbage collection API (FEATURE) - Implement transaction garbage collection API (FEATURE)
NEO packing implementation does not update transaction metadata when NEO packing implementation does not update transaction metadata when
deleting object revisions. This inconsistency must be made possible to deleting object revisions. This inconsistency must be made possible to
......
...@@ -194,17 +194,17 @@ class Application(ThreadedApplication): ...@@ -194,17 +194,17 @@ class Application(ThreadedApplication):
self.nm.reset() self.nm.reset()
if self.primary_master_node is not None: if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it. # If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node node = self.primary_master_node
self.primary_master_node = None self.primary_master_node = None
else: else:
# Otherwise, check one by one. # Otherwise, check one by one.
master_list = self.nm.getMasterList() master_list = self.nm.getMasterList()
index = (index + 1) % len(master_list) index = (index + 1) % len(master_list)
self.trying_master_node = master_list[index] node = master_list[index]
# Connect to master # Connect to master
conn = MTClientConnection(self, conn = MTClientConnection(self,
self.notifications_handler, self.notifications_handler,
node=self.trying_master_node, node=node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification( p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, None) NodeTypes.CLIENT, self.uuid, None, self.name, None)
...@@ -212,10 +212,8 @@ class Application(ThreadedApplication): ...@@ -212,10 +212,8 @@ class Application(ThreadedApplication):
ask(conn, p, handler=handler) ask(conn, p, handler=handler)
except ConnectionClosed: except ConnectionClosed:
fail_count += 1 fail_count += 1
continue else:
# If we reached the primary master node, mark as connected self.primary_master_node = node
if self.primary_master_node is not None and \
self.primary_master_node is self.trying_master_node:
break break
else: else:
raise NEOPrimaryMasterLost( raise NEOPrimaryMasterLost(
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import PrimaryElected
from neo.lib.handler import MTEventHandler from neo.lib.handler import MTEventHandler
from neo.lib.pt import MTPartitionTable as PartitionTable from neo.lib.pt import MTPartitionTable as PartitionTable
from neo.lib.protocol import NodeStates, ProtocolError from neo.lib.protocol import NodeStates
from neo.lib.util import dump
from . import AnswerBaseHandler from . import AnswerBaseHandler
from ..exception import NEOStorageError from ..exception import NEOStorageError
...@@ -26,10 +26,6 @@ from ..exception import NEOStorageError ...@@ -26,10 +26,6 @@ from ..exception import NEOStorageError
class PrimaryBootstrapHandler(AnswerBaseHandler): class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """ """ Bootstrap handler used when looking for the primary master """
def notReady(self, conn, message):
self.app.trying_master_node = None
conn.close()
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
assert row_list assert row_list
self.app.pt.load(ptid, row_list, self.app.nm) self.app.pt.load(ptid, row_list, self.app.nm)
...@@ -40,57 +36,14 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -40,57 +36,14 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
class PrimaryNotificationsHandler(MTEventHandler): class PrimaryNotificationsHandler(MTEventHandler):
""" Handler that process the notifications from the primary master """ """ Handler that process the notifications from the primary master """
def _acceptIdentification(self, node, uuid, num_partitions, def notPrimaryMaster(self, *args):
num_replicas, your_uuid, primary, known_master_list): try:
app = self.app super(PrimaryNotificationsHandler, self).notPrimaryMaster(*args)
except PrimaryElected, e:
# Register new master nodes. app.primary_master_node, = e.args
found = False
conn_address = node.getAddress()
for node_address, node_uuid in known_master_list:
if node_address == conn_address:
assert uuid == node_uuid, (dump(uuid), dump(node_uuid))
found = True
n = app.nm.getByAddress(node_address)
if n is None:
n = app.nm.createMaster(address=node_address)
if node_uuid is not None and n.getUUID() != node_uuid:
n.setUUID(node_uuid)
assert found, (node, dump(uuid), known_master_list)
conn = node.getConnection()
if primary is not None:
primary_node = app.nm.getByAddress(primary)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
logging.warning('Unknown primary master: %s. Ignoring.',
primary)
return
else:
if app.trying_master_node is not primary_node:
app.trying_master_node = None
conn.close()
app.primary_master_node = primary_node
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
return
# the master must give an UUID
if your_uuid is None:
raise ProtocolError('No UUID supplied')
app.uuid = your_uuid
logging.info('Got an UUID: %s', dump(app.uuid))
app.id_timestamp = None
# Always create partition table def _acceptIdentification(self, node, num_partitions, num_replicas):
app.pt = PartitionTable(num_partitions, num_replicas) self.app.pt = PartitionTable(num_partitions, num_replicas)
def answerLastTransaction(self, conn, ltid): def answerLastTransaction(self, conn, ltid):
app = self.app app = self.app
......
...@@ -42,13 +42,8 @@ class StorageEventHandler(MTEventHandler): ...@@ -42,13 +42,8 @@ class StorageEventHandler(MTEventHandler):
self.app.cp.removeConnection(node) self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionFailed(conn) super(StorageEventHandler, self).connectionFailed(conn)
def _acceptIdentification(self, node, def _acceptIdentification(*args):
uuid, num_partitions, num_replicas, your_uuid, primary, pass
master_list):
assert self.app.master_conn is None or \
primary == self.app.master_conn.getAddress(), (
primary, self.app.master_conn)
assert uuid == node.getUUID(), (uuid, node.getUUID())
class StorageBootstrapHandler(AnswerBaseHandler): class StorageBootstrapHandler(AnswerBaseHandler):
""" Handler used when connecting to a storage node """ """ Handler used when connecting to a storage node """
......
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import logging from . import logging
from .exception import PrimaryElected
from .handler import EventHandler from .handler import EventHandler
from .protocol import uuid_str, Packets from .protocol import Packets
from .connection import ClientConnection from .connection import ClientConnection
...@@ -24,7 +25,6 @@ class BootstrapManager(EventHandler): ...@@ -24,7 +25,6 @@ class BootstrapManager(EventHandler):
""" """
Manage the bootstrap stage, lookup for the primary master then connect to it Manage the bootstrap stage, lookup for the primary master then connect to it
""" """
accepted = False
def __init__(self, app, node_type, server=None): def __init__(self, app, node_type, server=None):
""" """
...@@ -32,85 +32,30 @@ class BootstrapManager(EventHandler): ...@@ -32,85 +32,30 @@ class BootstrapManager(EventHandler):
primary master node, connect to it then returns when the master node primary master node, connect to it then returns when the master node
is ready. is ready.
""" """
self.primary = None
self.server = server self.server = server
self.node_type = node_type self.node_type = node_type
self.num_replicas = None self.num_replicas = None
self.num_partitions = None self.num_partitions = None
self.current = None
app.nm.reset() app.nm.reset()
uuid = property(lambda self: self.app.uuid) uuid = property(lambda self: self.app.uuid)
def announcePrimary(self, conn):
# We found the primary master early enough to be notified of election
# end. Lucky. Anyway, we must carry on with identification request, so
# nothing to do here.
pass
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
"""
Triggered when the network connection is successful.
Now ask who's the primary.
"""
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
self.current.setRunning()
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid, conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, None)) self.server, self.app.name, None))
def connectionFailed(self, conn): def connectionFailed(self, conn):
"""
Triggered when the network connection failed.
Restart bootstrap.
"""
EventHandler.connectionFailed(self, conn) EventHandler.connectionFailed(self, conn)
self.current = None self.current = None
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
"""
Triggered when an established network connection is lost.
Restart bootstrap.
"""
self.current.setTemporarilyDown()
self.current = None self.current = None
def notReady(self, conn, message): def _acceptIdentification(self, node, num_partitions, num_replicas):
""" assert self.current is node, (self.current, node)
The primary master send this message when it is still not ready to
handle the client node.
Close connection and restart.
"""
conn.close()
def _acceptIdentification(self, node, uuid, num_partitions,
num_replicas, your_uuid, primary, known_master_list):
nm = self.app.nm
# Register new master nodes.
for address, uuid in known_master_list:
master_node = nm.getByAddress(address)
if master_node is None:
master_node = nm.createMaster(address=address)
master_node.setUUID(uuid)
self.primary = nm.getByAddress(primary)
if self.primary is None or self.current is not self.primary:
# three cases here:
# - something goes wrong (unknown UUID)
# - this master doesn't know who's the primary
# - got the primary's uuid, so cut here
node.getConnection().close()
return
logging.info('connected to a primary master node')
self.num_partitions = num_partitions self.num_partitions = num_partitions
self.num_replicas = num_replicas self.num_replicas = num_replicas
if self.uuid != your_uuid:
# got an uuid from the primary master
self.app.uuid = your_uuid
logging.info('Got a new UUID: %s', uuid_str(self.uuid))
self.app.id_timestamp = None
self.accepted = True
def getPrimaryConnection(self): def getPrimaryConnection(self):
""" """
...@@ -122,25 +67,26 @@ class BootstrapManager(EventHandler): ...@@ -122,25 +67,26 @@ class BootstrapManager(EventHandler):
poll = app.em.poll poll = app.em.poll
index = 0 index = 0
self.current = None self.current = None
conn = None
# retry until identified to the primary # retry until identified to the primary
while not self.accepted: while True:
if self.current is None: try:
# conn closed while self.current:
conn = None if self.current.isIdentified():
return (self.current, self.current.getConnection(),
self.num_partitions, self.num_replicas)
poll(1)
except PrimaryElected, e:
if self.current:
self.current.getConnection().close()
self.current, = e.args
index = app.nm.getMasterList().index(self.current)
else:
# select a master # select a master
master_list = app.nm.getMasterList() master_list = app.nm.getMasterList()
index = (index + 1) % len(master_list) index = (index + 1) % len(master_list)
self.current = master_list[index] self.current = master_list[index]
if conn is None: ClientConnection(app, self, self.current)
# open the connection # Note that the connection may be already closed. This happens when
conn = ClientConnection(app, self, self.current)
# Yes, the connection may be already closed. This happens when
# the kernel reacts so quickly to a closed port that 'connect' # the kernel reacts so quickly to a closed port that 'connect'
# fails on the first call. In such case, poll(1) would deadlock # fails on the first call. In such case, poll(1) would deadlock
# if there's no other connection to timeout. # if there's no other connection to timeout.
if conn.isClosed():
continue
# still processing
poll(1)
return self.current, conn, self.num_partitions, self.num_replicas
...@@ -259,10 +259,12 @@ class BaseConnection(object): ...@@ -259,10 +259,12 @@ class BaseConnection(object):
) )
def setHandler(self, handler): def setHandler(self, handler):
if self._handlers.setHandler(handler): changed = self._handlers.setHandler(handler)
logging.debug('Set handler %r on %r', handler, self) if changed:
logging.debug('Handler changed on %r', self)
else: else:
logging.debug('Delay handler %r on %r', handler, self) logging.debug('Delay handler %r on %r', handler, self)
return changed
def getUUID(self): def getUUID(self):
return None return None
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
class NeoException(Exception): class NeoException(Exception):
pass pass
class ElectionFailure(NeoException): class PrimaryElected(NeoException):
pass pass
class PrimaryFailure(NeoException): class PrimaryFailure(NeoException):
......
...@@ -19,9 +19,10 @@ from collections import deque ...@@ -19,9 +19,10 @@ from collections import deque
from operator import itemgetter from operator import itemgetter
from . import logging from . import logging
from .connection import ConnectionClosed from .connection import ConnectionClosed
from .protocol import ( from .exception import PrimaryElected
NodeStates, Packets, Errors, BackendNotImplemented, NonReadableCell, from .protocol import (NodeStates, NodeTypes, Packets, uuid_str,
NotReadyError, PacketMalformedError, ProtocolError, UnexpectedPacketError) Errors, BackendNotImplemented, NonReadableCell, NotReadyError,
PacketMalformedError, ProtocolError, UnexpectedPacketError)
from .util import cached_property from .util import cached_property
...@@ -147,16 +148,41 @@ class EventHandler(object): ...@@ -147,16 +148,41 @@ class EventHandler(object):
# Packet handlers. # Packet handlers.
def acceptIdentification(self, conn, node_type, *args): def notPrimaryMaster(self, conn, primary, known_master_list):
try: nm = self.app.nm
acceptIdentification = self._acceptIdentification for address in known_master_list:
except AttributeError: nm.createMaster(address=address)
raise UnexpectedPacketError('no handler found') if primary is not None:
node = self.app.nm.getByAddress(conn.getAddress()) primary = known_master_list[primary]
assert primary != self.app.server
raise PrimaryElected(nm.getByAddress(primary))
def _acceptIdentification(*args):
pass
def acceptIdentification(self, conn, node_type, uuid,
num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getByAddress(conn.getAddress())
assert node.getConnection() is conn, (node.getConnection(), conn) assert node.getConnection() is conn, (node.getConnection(), conn)
if node.getType() == node_type: if node.getType() == node_type:
if node_type == NodeTypes.MASTER:
other = app.nm.getByUUID(uuid)
if other is not None:
other.setUUID(None)
node.setUUID(uuid)
node.setRunning()
if your_uuid is None:
raise ProtocolError('No UUID supplied')
logging.info('connected to a primary master node')
if app.uuid != your_uuid:
app.uuid = your_uuid
logging.info('Got a new UUID: %s', uuid_str(your_uuid))
app.id_timestamp = None
elif node.getUUID() != uuid or app.uuid != your_uuid != None:
raise ProtocolError('invalid uuids')
node.setIdentified() node.setIdentified()
acceptIdentification(node, *args) self._acceptIdentification(node, num_partitions, num_replicas)
return return
conn.close() conn.close()
......
...@@ -423,7 +423,7 @@ class NodeManager(EventQueue): ...@@ -423,7 +423,7 @@ class NodeManager(EventQueue):
# lookup in current table # lookup in current table
node_by_uuid = self.getByUUID(uuid) node_by_uuid = self.getByUUID(uuid)
node_by_addr = self.getByAddress(addr) node_by_addr = self.getByAddress(addr)
node = node_by_uuid or node_by_addr node = node_by_addr or node_by_uuid
log_args = node_type, uuid_str(uuid), addr, state, id_timestamp log_args = node_type, uuid_str(uuid), addr, state, id_timestamp
if node is None: if node is None:
...@@ -434,10 +434,11 @@ class NodeManager(EventQueue): ...@@ -434,10 +434,11 @@ class NodeManager(EventQueue):
else: else:
assert isinstance(node, klass), 'node %r is not ' \ assert isinstance(node, klass), 'node %r is not ' \
'of expected type: %r' % (node, klass) 'of expected type: %r' % (node, klass)
assert None in (node_by_uuid, node_by_addr) or \ if None is not node_by_uuid is not node_by_addr is not None:
node_by_uuid is node_by_addr, \ assert added_list is not None, \
'Discrepancy between node_by_uuid (%r) and ' \ 'Discrepancy between node_by_uuid (%r) and ' \
'node_by_addr (%r)' % (node_by_uuid, node_by_addr) 'node_by_addr (%r)' % (node_by_uuid, node_by_addr)
node_by_uuid.setUUID(None)
if state == NodeStates.DOWN: if state == NodeStates.DOWN:
logging.debug('dropping node %r (%r), found with %s ' logging.debug('dropping node %r (%r), found with %s '
'%s %s %s %s', node, node.isConnected(), *log_args) '%s %s %s %s', node, node.isConnected(), *log_args)
......
...@@ -578,12 +578,14 @@ class PChecksum(PItem): ...@@ -578,12 +578,14 @@ class PChecksum(PItem):
def _decode(self, reader): def _decode(self, reader):
return reader(20) return reader(20)
class PUUID(PStructItemOrNone): class PSignedNull(PStructItemOrNone):
_fmt = '!l'
_None = Struct(_fmt).pack(0)
class PUUID(PSignedNull):
""" """
An UUID (node identifier, 4-bytes signed integer) An UUID (node identifier, 4-bytes signed integer)
""" """
_fmt = '!l'
_None = Struct(_fmt).pack(0)
class PTID(PItem): class PTID(PItem):
""" """
...@@ -715,13 +717,6 @@ class RequestIdentification(Packet): ...@@ -715,13 +717,6 @@ class RequestIdentification(Packet):
PNumber('num_partitions'), PNumber('num_partitions'),
PNumber('num_replicas'), PNumber('num_replicas'),
PUUID('your_uuid'), PUUID('your_uuid'),
PAddress('primary'),
PList('known_master_list',
PStruct('master',
PAddress('address'),
PUUID('uuid'),
),
),
) )
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
...@@ -742,15 +737,16 @@ class PrimaryMaster(Packet): ...@@ -742,15 +737,16 @@ class PrimaryMaster(Packet):
PUUID('primary_uuid'), PUUID('primary_uuid'),
) )
class AnnouncePrimary(Packet): class NotPrimaryMaster(Packet):
"""
Announce a primary master node election. PM -> SM.
"""
class ReelectPrimary(Packet):
""" """
Force a re-election of a primary master node. M -> M. Send list of known master nodes. SM -> Any.
""" """
_fmt = PStruct('not_primary_master',
PSignedNull('primary'),
PList('known_master_list',
PAddress('address'),
),
)
class Recovery(Packet): class Recovery(Packet):
""" """
...@@ -1687,10 +1683,8 @@ class Packets(dict): ...@@ -1687,10 +1683,8 @@ class Packets(dict):
Notify) Notify)
AskPrimary, AnswerPrimary = register( AskPrimary, AnswerPrimary = register(
PrimaryMaster) PrimaryMaster)
AnnouncePrimary = register( NotPrimaryMaster = register(
AnnouncePrimary) NotPrimaryMaster)
ReelectPrimary = register(
ReelectPrimary)
NotifyNodeInformation = register( NotifyNodeInformation = register(
NotifyNodeInformation) NotifyNodeInformation)
AskRecovery, AnswerRecovery = register( AskRecovery, AnswerRecovery = register(
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys, weakref import sys
from collections import defaultdict from collections import defaultdict
from time import time from time import time
...@@ -25,7 +25,7 @@ from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID ...@@ -25,7 +25,7 @@ from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.connection import ListeningConnection, ClientConnection from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, StoppedOperation from neo.lib.exception import PrimaryElected, PrimaryFailure, StoppedOperation
class StateChangedException(Exception): pass class StateChangedException(Exception): pass
...@@ -40,8 +40,7 @@ def monotonic_time(): ...@@ -40,8 +40,7 @@ def monotonic_time():
return now return now
from .backup_app import BackupApplication from .backup_app import BackupApplication
from .handlers import election, identification, secondary from .handlers import identification, administration, client, master, storage
from .handlers import administration, client, storage
from .pt import PartitionTable from .pt import PartitionTable
from .recovery import RecoveryManager from .recovery import RecoveryManager
from .transactions import TransactionManager from .transactions import TransactionManager
...@@ -57,9 +56,21 @@ class Application(BaseApplication): ...@@ -57,9 +56,21 @@ class Application(BaseApplication):
backup_tid = None backup_tid = None
backup_app = None backup_app = None
truncate_tid = None truncate_tid = None
uuid = property(
lambda self: self._node.getUUID(), def uuid(self, uuid):
lambda self, uuid: self._node.setUUID(uuid)) node = self.nm.getByUUID(uuid)
if node is not self._node:
if node:
node.setUUID(None)
if node.isConnected(True):
node.getConnection().close()
self._node.setUUID(uuid)
uuid = property(lambda self: self._node.getUUID(), uuid)
@property
def election(self):
if self.primary and self.cluster_state == ClusterStates.RECOVERING:
return self.primary
def __init__(self, config): def __init__(self, config):
super(Application, self).__init__( super(Application, self).__init__(
...@@ -92,15 +103,7 @@ class Application(BaseApplication): ...@@ -92,15 +103,7 @@ class Application(BaseApplication):
logging.info('Name : %s', self.name) logging.info('Name : %s', self.name)
self.listening_conn = None self.listening_conn = None
self.primary = None
self.primary_master_node = None
self.cluster_state = None self.cluster_state = None
# election related data
self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set()
self.master_address_dict = weakref.WeakKeyDictionary()
self._current_manager = None self._current_manager = None
# backup # backup
...@@ -114,7 +117,8 @@ class Application(BaseApplication): ...@@ -114,7 +117,8 @@ class Application(BaseApplication):
self.administration_handler = administration.AdministrationHandler( self.administration_handler = administration.AdministrationHandler(
self) self)
self.secondary_master_handler = secondary.SecondaryMasterHandler(self) self.election_handler = master.ElectionHandler(self)
self.secondary_handler = master.SecondaryHandler(self)
self.client_service_handler = client.ClientServiceHandler(self) self.client_service_handler = client.ClientServiceHandler(self)
self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self) self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self)
self.storage_service_handler = storage.StorageServiceHandler(self) self.storage_service_handler = storage.StorageServiceHandler(self)
...@@ -146,96 +150,10 @@ class Application(BaseApplication): ...@@ -146,96 +150,10 @@ class Application(BaseApplication):
raise raise
def _run(self): def _run(self):
"""Make sure that the status is sane and start a loop."""
# Make a listening port.
self.listening_conn = ListeningConnection(self, None, self.server) self.listening_conn = ListeningConnection(self, None, self.server)
while True:
# Start a normal operation.
while self.cluster_state != ClusterStates.STOPPING:
# (Re)elect a new primary master.
self.primary = not self.nm.getMasterList()
if not self.primary:
self.electPrimary()
try:
if self.primary:
self.playPrimaryRole() self.playPrimaryRole()
else:
self.playSecondaryRole() self.playSecondaryRole()
raise RuntimeError, 'should not reach here'
except (ElectionFailure, PrimaryFailure):
# Forget all connections.
for conn in self.em.getClientList():
conn.close()
def electPrimary(self):
"""Elect a primary master node.
The difficulty is that a master node must accept connections from
others while attempting to connect to other master nodes at the
same time. Note that storage nodes and client nodes may connect
to self as well as master nodes."""
logging.info('begin the election of a primary master')
client_handler = election.ClientElectionHandler(self)
self.unconnected_master_node_set.clear()
self.negotiating_master_node_set.clear()
self.master_address_dict.clear()
self.listening_conn.setHandler(election.ServerElectionHandler(self))
getByAddress = self.nm.getByAddress
while True:
# handle new connected masters
for node in self.nm.getMasterList():
if node is self._node:
continue
node.setUnknown()
self.unconnected_master_node_set.add(node.getAddress())
# start the election process
self.primary = None
self.primary_master_node = None
try:
while (self.unconnected_master_node_set or
self.negotiating_master_node_set):
for addr in self.unconnected_master_node_set:
self.negotiating_master_node_set.add(addr)
ClientConnection(self, client_handler,
# XXX: Ugly, but the whole election code will be
# replaced soon
getByAddress(addr))
self.unconnected_master_node_set.clear()
self.em.poll(1)
except ElectionFailure, m:
# something goes wrong, clean then restart
logging.error('election failed: %s', m)
# Ask all connected nodes to reelect a single primary master.
for conn in self.em.getClientList():
conn.send(Packets.ReelectPrimary())
conn.abort()
# Wait until the connections are closed.
self.primary = None
self.primary_master_node = None
# XXX: Since poll does not wake up anymore every second,
# the following time condition should be reviewed.
# See also playSecondaryRole.
t = time() + 10
while self.em.getClientList() and time() < t:
try:
self.em.poll(1)
except ElectionFailure:
pass
# Close all connections.
for conn in self.em.getClientList() + self.em.getServerList():
conn.close()
else:
# election succeed, stop the process
self.primary = self.primary is None
break
def getNodeInformationDict(self, node_list): def getNodeInformationDict(self, node_list):
node_dict = defaultdict(list) node_dict = defaultdict(list)
...@@ -310,47 +228,25 @@ class Application(BaseApplication): ...@@ -310,47 +228,25 @@ class Application(BaseApplication):
def playPrimaryRole(self): def playPrimaryRole(self):
logging.info('play the primary role with %r', self.listening_conn) logging.info('play the primary role with %r', self.listening_conn)
self.master_address_dict.clear() self.primary_master = None
em = self.em for conn in self.em.getConnectionList():
packet = Packets.AnnouncePrimary()
for conn in em.getConnectionList():
if conn.isListening(): if conn.isListening():
conn.setHandler(identification.IdentificationHandler(self)) conn.setHandler(identification.IdentificationHandler(self))
else: else:
conn.send(packet) conn.close()
# Primary master should rather establish connections to all
# secondaries, rather than the other way around. This requires
# a bit more work when a new master joins a cluster but makes
# it easier to resolve UUID conflicts with minimal cluster
# impact, and ensure primary master uniqueness (primary masters
# become noisy, in that they actively try to maintain
# connections to all other master nodes, so duplicate
# primaries will eventually get in touch with each other and
# resolve the situation with a duel).
# TODO: only abort client connections, don't close server
# connections as we want to have them in the end. Secondary
# masters will reconnect nevertheless, but it's dirty.
# Currently, it's not trivial to preserve connected nodes,
# because of poor node status tracking during election.
# XXX: The above comment is partially wrong in that the primary
# master is now responsible of allocating node ids, and all
# other nodes must only create/update/remove nodes when
# processing node notification. We probably want to keep the
# current behaviour: having only server connections.
conn.abort()
# If I know any storage node, make sure that they are not in the # If I know any storage node, make sure that they are not in the
# running state, because they are not connected at this stage. # running state, because they are not connected at this stage.
for node in self.nm.getStorageList(): for node in self.nm.getStorageList():
if node.isRunning(): assert node.isTemporarilyDown(), node
node.setTemporarilyDown()
if self.uuid is None: if self.uuid is None:
self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER) self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER)
logging.info('My UUID: ' + uuid_str(self.uuid)) logging.info('My UUID: ' + uuid_str(self.uuid))
self._node.setRunning() self._node.setRunning()
self.primary = monotonic_time()
# Do not restart automatically if ElectionFailure is raised, in order # Do not restart automatically if an election happens, in order
# to avoid a split of the database. For example, with 2 machines with # to avoid a split of the database. For example, with 2 machines with
# a master and a storage on each one and replicas=1, the secondary # a master and a storage on each one and replicas=1, the secondary
# master becomes primary in case of network failure between the 2 # master becomes primary in case of network failure between the 2
...@@ -396,41 +292,91 @@ class Application(BaseApplication): ...@@ -396,41 +292,91 @@ class Application(BaseApplication):
except StateChangedException, e: except StateChangedException, e:
assert e.args[0] == ClusterStates.STOPPING assert e.args[0] == ClusterStates.STOPPING
self.shutdown() self.shutdown()
except PrimaryElected, e:
self.primary_master, = e.args
def playSecondaryRole(self): def playSecondaryRole(self):
""" """
I play a secondary role, thus only wait for a primary master to fail. A master play the secondary role when it is unlikely to win the
election (it lost against against another master during identification
or it was notified that another is the primary master).
Its only task is to try again to become the primary master when the
later fail. When connected to the cluster, the only communication is
with the primary master, to stay informed about removed/added master
nodes, and exit if requested.
""" """
logging.info('play the secondary role with %r', self.listening_conn) logging.info('play the secondary role with %r', self.listening_conn)
self.primary = None
# Wait for an announcement. If this is too long, probably handler = master.PrimaryHandler(self)
# the primary master is down. # The connection to the probably-primary master can be in any state
# XXX: Same remark as in electPrimary. # depending on how we were informed. The only case in which it can not
t = time() + 10 # be reused in when we have pending requests.
while self.primary_master_node is None: if self.primary_master.isConnected(True):
self.em.poll(1) master_conn = self.primary_master.getConnection()
if t < time(): # When we find the primary during identification, we don't attach
# election timeout # the connection (a server one) to any node, and it will be closed
raise ElectionFailure("Election timeout") # in the below 'for' loop.
self.master_address_dict.clear() assert master_conn.isClient(), master_conn
try:
# Restart completely. Non-optimized # We want the handler to be effective immediately.
# but lower level code needs to be stabilized first. # If it's not possible, let's just reconnect.
if not master_conn.setHandler(handler):
master_conn.close()
assert False
except PrimaryFailure:
master_conn = None
else:
master_conn = None
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
if not conn.isListening(): if conn.isListening():
conn.close() conn.setHandler(
# Reconnect to primary master node.
self.nm.reset()
primary_handler = secondary.PrimaryHandler(self)
ClientConnection(self, primary_handler, self.primary_master_node)
# and another for the future incoming connections
self.listening_conn.setHandler(
identification.SecondaryIdentificationHandler(self)) identification.SecondaryIdentificationHandler(self))
elif conn is not master_conn:
conn.close()
failed = {self.server}
poll = self.em.poll
while True: while True:
self.em.poll(1) try:
if master_conn is None:
for node in self.nm.getMasterList():
node.setTemporarilyDown()
node = self.primary_master
failed.add(node.getAddress())
if not node.isConnected(True):
# On immediate connection failure,
# PrimaryFailure is raised.
ClientConnection(self, handler, node)
else:
master_conn = None
while True:
poll(1)
except PrimaryFailure:
if self.primary_master.isRunning():
# XXX: What's the best to do here ? Another option is to
# choose the RUNNING master node with the lowest
# election key (i.e. (id_timestamp, address) as
# defined in IdentificationHandler), and return if we
# have the lowest one.
failed = {self.server}
else:
# Since the last primary failure (or since we play the
# secondary role), do not try any node more than once.
for self.primary_master in self.nm.getMasterList():
if self.primary_master.getAddress() not in failed:
break
else:
# All known master nodes are either down or secondary.
# Let's play the primary role again.
break
except PrimaryElected, e:
node = self.primary_master
self.primary_master, = e.args
assert node is not self.primary_master, node
try:
node.getConnection().close()
except PrimaryFailure:
pass
def runManager(self, manager_klass): def runManager(self, manager_klass):
self._current_manager = manager_klass(self) self._current_manager = manager_klass(self)
...@@ -459,9 +405,14 @@ class Application(BaseApplication): ...@@ -459,9 +405,14 @@ class Application(BaseApplication):
# change handlers # change handlers
notification_packet = Packets.NotifyClusterInformation(state) notification_packet = Packets.NotifyClusterInformation(state)
for node in self.nm.getIdentifiedList(): for node in self.nm.getList():
if not node.isConnected(True):
continue
conn = node.getConnection() conn = node.getConnection()
if node.isIdentified():
conn.send(notification_packet) conn.send(notification_packet)
elif conn.isServer():
continue
if node.isClient(): if node.isClient():
if state == ClusterStates.RUNNING: if state == ClusterStates.RUNNING:
handler = self.client_service_handler handler = self.client_service_handler
...@@ -471,6 +422,11 @@ class Application(BaseApplication): ...@@ -471,6 +422,11 @@ class Application(BaseApplication):
if state != ClusterStates.STOPPING: if state != ClusterStates.STOPPING:
conn.abort() conn.abort()
continue continue
elif node.isMaster():
if state == ClusterStates.RECOVERING:
handler = self.election_handler
else:
handler = self.secondary_handler
elif node.isStorage() and storage_handler: elif node.isStorage() and storage_handler:
handler = storage_handler handler = storage_handler
else: else:
...@@ -488,7 +444,9 @@ class Application(BaseApplication): ...@@ -488,7 +444,9 @@ class Application(BaseApplication):
return uuid return uuid
hob = UUID_NAMESPACES[node_type] hob = UUID_NAMESPACES[node_type]
for uuid in xrange((hob << 24) + 1, hob + 0x10 << 24): for uuid in xrange((hob << 24) + 1, hob + 0x10 << 24):
if uuid != self.uuid and getByUUID(uuid) is None: node = getByUUID(uuid)
if node is None or None is not address == node.getAddress():
assert uuid != self.uuid
return uuid return uuid
raise RuntimeError raise RuntimeError
...@@ -520,17 +478,19 @@ class Application(BaseApplication): ...@@ -520,17 +478,19 @@ class Application(BaseApplication):
logging.info("asking remaining nodes to shutdown") logging.info("asking remaining nodes to shutdown")
self.listening_conn.close() self.listening_conn.close()
handler = EventHandler(self) handler = EventHandler(self)
for node in self.nm.getConnectedList(): for node in self.nm.getList():
if not node.isConnected(True):
continue
conn = node.getConnection() conn = node.getConnection()
if node.isStorage():
conn.setHandler(handler) conn.setHandler(handler)
if not conn.connecting:
if node.isStorage():
conn.send(Packets.NotifyNodeInformation(monotonic_time(), (( conn.send(Packets.NotifyNodeInformation(monotonic_time(), ((
node.getType(), node.getAddress(), node.getUUID(), node.getType(), node.getAddress(), node.getUUID(),
NodeStates.TEMPORARILY_DOWN, None),))) NodeStates.TEMPORARILY_DOWN, None),)))
if conn.pending():
conn.abort() conn.abort()
elif conn.pending(): continue
conn.abort()
else:
conn.close() conn.close()
while self.em.connection_dict: while self.em.connection_dict:
......
...@@ -29,34 +29,9 @@ class MasterHandler(EventHandler): ...@@ -29,34 +29,9 @@ class MasterHandler(EventHandler):
if new is None: if new is None:
super(MasterHandler, self).connectionCompleted(conn) super(MasterHandler, self).connectionCompleted(conn)
def requestIdentification(self, conn, node_type, uuid, address, name, _): def connectionLost(self, conn, new_state=None):
self.checkClusterName(name) if self.app.listening_conn: # if running
app = self.app self._connectionLost(conn)
node = app.nm.getByUUID(uuid)
if node:
if node_type is NodeTypes.MASTER and not (
None != address == node.getAddress()):
raise ProtocolError
peer_uuid = self._setupNode(conn, node_type, uuid, address, node)
if app.primary:
primary_address = app.server
elif app.primary_master_node is not None:
primary_address = app.primary_master_node.getAddress()
else:
primary_address = None
known_master_list = []
for n in app.nm.getMasterList():
known_master_list.append((n.getAddress(), n.getUUID()))
conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
peer_uuid,
primary_address,
known_master_list),
)
def askClusterState(self, conn): def askClusterState(self, conn):
state = self.app.getClusterState() state = self.app.getClusterState()
......
...@@ -22,12 +22,11 @@ from . import MasterHandler ...@@ -22,12 +22,11 @@ from . import MasterHandler
class ClientServiceHandler(MasterHandler): class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """ """ Handler dedicated to client during service state """
def connectionLost(self, conn, new_state): def _connectionLost(self, conn):
# cancel its transactions and forgot the node # cancel its transactions and forgot the node
app = self.app app = self.app
if app.listening_conn: # if running
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
assert node is not None assert node is not None, conn
app.tm.clientLost(node) app.tm.clientLost(node)
node.setState(NodeStates.DOWN) node.setState(NodeStates.DOWN)
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
......
#
# Copyright (C) 2006-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.protocol import uuid_str, NodeTypes, Packets
from neo.lib.protocol import NotReadyError
from neo.lib.exception import ElectionFailure
from neo.lib.handler import EventHandler
from . import MasterHandler
class BaseElectionHandler(EventHandler):
def _notifyNodeInformation(self, conn):
pass
def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested'
def announcePrimary(self, conn):
app = self.app
if app.primary:
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
try:
address = app.master_address_dict[conn]
assert conn.isServer()
except KeyError:
address = conn.getAddress()
assert conn.isClient()
app.primary = False
app.primary_master_node = node = app.nm.getByAddress(address)
app.negotiating_master_node_set.clear()
logging.info('%s is the primary', node)
def elect(self, conn, peer_address):
app = self.app
if app.server < peer_address:
app.primary = False
if conn is not None:
app.master_address_dict[conn] = peer_address
app.negotiating_master_node_set.discard(peer_address)
class ClientElectionHandler(BaseElectionHandler):
def notifyNodeInformation(self, conn, timestamp, node_list):
# XXX: For the moment, do nothing because
# we'll close this connection and reconnect.
pass
def connectionFailed(self, conn):
addr = conn.getAddress()
node = self.app.nm.getByAddress(addr)
assert node is not None, (uuid_str(self.app.uuid), addr)
# node may still be in unknown state
self.app.negotiating_master_node_set.discard(addr)
super(ClientElectionHandler, self).connectionFailed(conn)
def connectionCompleted(self, conn):
app = self.app
conn.ask(Packets.RequestIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
app.name,
None,
))
super(ClientElectionHandler, self).connectionCompleted(conn)
def connectionLost(self, conn, new_state):
# Retry connection. Either the node just died (and we will end up in
# connectionFailed) or it just got elected (and we must not ignore
# that node).
addr = conn.getAddress()
self.app.unconnected_master_node_set.add(addr)
self.app.negotiating_master_node_set.discard(addr)
def _acceptIdentification(self, node, peer_uuid, num_partitions,
num_replicas, your_uuid, primary, known_master_list):
app = self.app
# Register new master nodes.
for address, uuid in known_master_list:
if app.server == address:
# This is self.
assert node.getAddress() != primary or uuid == your_uuid, (
uuid_str(uuid), uuid_str(your_uuid))
continue
n = app.nm.getByAddress(address)
if n is None:
n = app.nm.createMaster(address=address)
if primary is not None:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getAddress() != primary:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getByAddress(primary)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
logging.warning('received an unknown primary node')
else:
# Whatever the situation is, I trust this master.
app.primary = False
app.primary_master_node = primary_node
# Stop waiting for connections than primary master's to
# complete to exit election phase ASAP.
app.negotiating_master_node_set.clear()
return
self.elect(None, node.getAddress())
class ServerElectionHandler(BaseElectionHandler, MasterHandler):
def _setupNode(self, conn, node_type, uuid, address, node):
app = self.app
if node_type != NodeTypes.MASTER:
logging.info('reject a connection from a non-master')
raise NotReadyError
if node is None is app.nm.getByAddress(address):
app.nm.createMaster(address=address)
self.elect(conn, address)
return uuid
...@@ -15,27 +15,25 @@ ...@@ -15,27 +15,25 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import PrimaryElected
from neo.lib.handler import EventHandler
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \ from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \
NotReadyError, ProtocolError, uuid_str NotReadyError, Packets, ProtocolError, uuid_str
from ..app import monotonic_time from ..app import monotonic_time
from . import MasterHandler
class IdentificationHandler(MasterHandler): class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, *args, **kw): def requestIdentification(self, conn, node_type, uuid,
super(IdentificationHandler, self).requestIdentification(conn, *args, address, name, id_timestamp):
**kw)
handler = conn.getHandler()
assert not isinstance(handler, IdentificationHandler), handler
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True)
def _setupNode(self, conn, node_type, uuid, address, node):
app = self.app app = self.app
self.checkClusterName(name)
if address == app.server:
raise ProtocolError('address conflict')
node = app.nm.getByUUID(uuid)
by_addr = address and app.nm.getByAddress(address) by_addr = address and app.nm.getByAddress(address)
while 1: while 1:
if by_addr: if by_addr:
if not by_addr.isConnected(): if not by_addr.isIdentified():
if node is by_addr: if node is by_addr:
break break
if not node or uuid < 0: if not node or uuid < 0:
...@@ -44,7 +42,7 @@ class IdentificationHandler(MasterHandler): ...@@ -44,7 +42,7 @@ class IdentificationHandler(MasterHandler):
node = by_addr node = by_addr
break break
elif node: elif node:
if node.isConnected(): if node.isIdentified():
if uuid < 0: if uuid < 0:
# The peer wants a temporary id that's already assigned. # The peer wants a temporary id that's already assigned.
# Let's give it another one. # Let's give it another one.
...@@ -78,7 +76,14 @@ class IdentificationHandler(MasterHandler): ...@@ -78,7 +76,14 @@ class IdentificationHandler(MasterHandler):
uuid is not None and node is not None) uuid is not None and node is not None)
human_readable_node_type = ' storage (%s) ' % (state, ) human_readable_node_type = ' storage (%s) ' % (state, )
elif node_type == NodeTypes.MASTER: elif node_type == NodeTypes.MASTER:
handler = app.secondary_master_handler if app.election:
if id_timestamp and \
(id_timestamp, address) < (app.election, app.server):
raise PrimaryElected(by_addr or
app.nm.createMaster(address=address))
handler = app.election_handler
else:
handler = app.secondary_handler
human_readable_node_type = ' master ' human_readable_node_type = ' master '
elif node_type == NodeTypes.ADMIN: elif node_type == NodeTypes.ADMIN:
handler = app.administration_handler handler = app.administration_handler
...@@ -95,22 +100,43 @@ class IdentificationHandler(MasterHandler): ...@@ -95,22 +100,43 @@ class IdentificationHandler(MasterHandler):
node.setUUID(uuid) node.setUUID(uuid)
node.id_timestamp = monotonic_time() node.id_timestamp = monotonic_time()
node.setState(state) node.setState(state)
node.setConnection(conn)
conn.setHandler(handler) conn.setHandler(handler)
node.setConnection(conn, not node.isIdentified())
app.broadcastNodesInformation([node], node) app.broadcastNodesInformation([node], node)
return uuid
class SecondaryIdentificationHandler(MasterHandler): conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
app.pt.getPartitions(),
app.pt.getReplicas(),
uuid))
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True)
def announcePrimary(self, conn):
# If we received AnnouncePrimary on a client connection, we might have
# set this handler on server connection, and might receive
# AnnouncePrimary there too. As we cannot reach this without already
# handling a first AnnouncePrimary, we can safely ignore this one.
pass
def _setupNode(self, conn, node_type, uuid, address, node): class SecondaryIdentificationHandler(EventHandler):
# Nothing to do, storage will disconnect when it receives our answer.
# Primary will do the checks.
return uuid
def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp):
app = self.app
self.checkClusterName(name)
if address == app.server:
raise ProtocolError('address conflict')
primary = app.primary_master.getAddress()
if primary == address:
primary = None
elif not app.primary_master.isIdentified():
if node_type == NodeTypes.MASTER:
node = app.nm.createMaster(address=address)
if id_timestamp:
conn.close()
raise PrimaryElected(node)
primary = None
# For some cases, we rely on the fact that the remote will not retry
# immediately (see SocketConnector.CONNECT_LIMIT).
known_master_list = [node.getAddress()
for node in app.nm.getMasterList()]
conn.send(Packets.NotPrimaryMaster(
primary and known_master_list.index(primary),
known_master_list))
conn.abort()
...@@ -15,60 +15,76 @@ ...@@ -15,60 +15,76 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys import sys
from ..app import monotonic_time
from . import MasterHandler from . import MasterHandler
from neo.lib.handler import EventHandler from neo.lib.exception import PrimaryElected, PrimaryFailure
from neo.lib.exception import ElectionFailure, PrimaryFailure from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.protocol import NodeStates, NodeTypes, Packets, uuid_str
from neo.lib import logging
class SecondaryMasterHandler(MasterHandler):
""" Handler used by primary to handle secondary masters"""
def connectionLost(self, conn, new_state): class SecondaryHandler(MasterHandler):
"""Handler used by primary to handle secondary masters"""
def _connectionLost(self, conn):
app = self.app app = self.app
if app.listening_conn: # if running
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
node.setDown() node.setTemporarilyDown()
app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
def announcePrimary(self, conn):
raise ElectionFailure, 'another primary arises'
def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested'
class PrimaryHandler(EventHandler): class ElectionHandler(MasterHandler):
""" Handler used by secondaries to handle primary master""" """Handler used by primary to handle secondary masters during election"""
def connectionLost(self, conn, new_state): def connectionCompleted(self, conn, new=None):
self.connectionFailed(conn) if new is None:
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election))
def connectionFailed(self, conn): def connectionFailed(self, conn):
self.app.primary_master_node.setDown() super(ElectionHandler, self).connectionFailed(conn)
if self.app.listening_conn: # if running self.connectionLost(conn)
def _acceptIdentification(self, node, *args):
raise PrimaryElected(node)
def _connectionLost(self, *args):
if self.app.primary: # not switching to secondary role
self.app._current_manager.try_secondary = True
def notPrimaryMaster(self, *args):
try:
super(ElectionHandler, self).notPrimaryMaster(*args)
except PrimaryElected, e:
# We keep playing the primary role when the peer does not
# know yet that we won election against the returned node.
if not e.args[0].isIdentified():
raise
# There may be new master nodes. Connect to them.
self.app._current_manager.try_secondary = True
class PrimaryHandler(ElectionHandler):
"""Handler used by secondaries to handle primary master"""
def _acceptIdentification(self, node, num_partitions, num_replicas):
assert self.app.primary_master is node, (self.app.primary_master, node)
def _connectionLost(self, conn):
node = self.app.primary_master
# node is None when switching to primary role
if node and not node.isConnected(True):
raise PrimaryFailure('primary master is dead') raise PrimaryFailure('primary master is dead')
def connectionCompleted(self, conn): def notPrimaryMaster(self, *args):
app = self.app try:
addr = conn.getAddress() super(ElectionHandler, self).notPrimaryMaster(*args)
node = app.nm.getByAddress(addr) except PrimaryElected, e:
# connection successful, set it as running if e.args[0] is not self.app.primary_master:
node.setRunning() raise
conn.ask(Packets.RequestIdentification(
NodeTypes.MASTER,
app.uuid,
app.server,
app.name,
None,
))
super(PrimaryHandler, self).connectionCompleted(conn)
def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested'
def notifyClusterInformation(self, conn, state): def notifyClusterInformation(self, conn, state):
self.app.cluster_state = state if state == ClusterStates.STOPPING:
sys.exit()
def notifyNodeInformation(self, conn, timestamp, node_list): def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryHandler, self).notifyNodeInformation( super(PrimaryHandler, self).notifyNodeInformation(
...@@ -77,17 +93,3 @@ class PrimaryHandler(EventHandler): ...@@ -77,17 +93,3 @@ class PrimaryHandler(EventHandler):
assert node_type == NodeTypes.MASTER, node_type assert node_type == NodeTypes.MASTER, node_type
if uuid == self.app.uuid and state == NodeStates.UNKNOWN: if uuid == self.app.uuid and state == NodeStates.UNKNOWN:
sys.exit() sys.exit()
def _acceptIdentification(self, node, uuid, num_partitions,
num_replicas, your_uuid, primary, known_master_list):
app = self.app
if primary != app.primary_master_node.getAddress():
raise PrimaryFailure('unexpected primary uuid')
if your_uuid != app.uuid:
app.uuid = your_uuid
logging.info('My UUID: ' + uuid_str(your_uuid))
node.setUUID(uuid)
app.id_timestamp = None
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.connection import ClientConnection
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from .app import monotonic_time from .app import monotonic_time
from .handlers import MasterHandler from .handlers import MasterHandler
...@@ -47,6 +48,7 @@ class RecoveryManager(MasterHandler): ...@@ -47,6 +48,7 @@ class RecoveryManager(MasterHandler):
TID, and the last Partition Table ID from storage nodes, then get TID, and the last Partition Table ID from storage nodes, then get
back the latest partition table or make a new table from scratch, back the latest partition table or make a new table from scratch,
if this is the first time. if this is the first time.
A new primary master may also arise during this phase.
""" """
logging.info('begin the recovery of the status') logging.info('begin the recovery of the status')
app = self.app app = self.app
...@@ -54,9 +56,30 @@ class RecoveryManager(MasterHandler): ...@@ -54,9 +56,30 @@ class RecoveryManager(MasterHandler):
app.changeClusterState(ClusterStates.RECOVERING) app.changeClusterState(ClusterStates.RECOVERING)
pt.clear() pt.clear()
self.try_secondary = True
# collect the last partition table available # collect the last partition table available
poll = app.em.poll poll = app.em.poll
while 1: while 1:
if self.try_secondary:
# Keep trying to connect to all other known masters,
# to make sure there is a challege between each pair
# of masters in the cluster. If we win, all connections
# opened here will be closed.
self.try_secondary = False
node_list = []
for node in app.nm.getMasterList():
if not (node is app._node or node.isConnected(True)):
# During recovery, master nodes are not put back in
# TEMPORARILY_DOWN state by handlers. This is done
# entirely in this method (here and after this poll
# loop), to minimize the notification packets.
if not node.isTemporarilyDown():
node.setTemporarilyDown()
node_list.append(node)
ClientConnection(app, app.election_handler, node)
if node_list:
app.broadcastNodesInformation(node_list)
poll(1) poll(1)
if pt.filled(): if pt.filled():
# A partition table exists, we are starting an existing # A partition table exists, we are starting an existing
...@@ -100,6 +123,17 @@ class RecoveryManager(MasterHandler): ...@@ -100,6 +123,17 @@ class RecoveryManager(MasterHandler):
for node in node_list: for node in node_list:
assert node.isPending(), node assert node.isPending(), node
node.setRunning() node.setRunning()
for node in app.nm.getMasterList():
if not (node is app._node or node.isIdentified()):
if node.isConnected(True):
node.getConnection().close()
assert node.isTemporarilyDown(), node
elif not node.isTemporarilyDown():
assert self.try_secondary, node
node.setTemporarilyDown()
node_list.append(node)
app.broadcastNodesInformation(node_list) app.broadcastNodesInformation(node_list)
if pt.getID() is None: if pt.getID() is None:
......
...@@ -42,7 +42,6 @@ from neo.tests.benchmark import BenchmarkRunner ...@@ -42,7 +42,6 @@ from neo.tests.benchmark import BenchmarkRunner
# each of them have to import its TestCase classes # each of them have to import its TestCase classes
UNIT_TEST_MODULES = [ UNIT_TEST_MODULES = [
# generic parts # generic parts
'neo.tests.testBootstrap',
'neo.tests.testConnection', 'neo.tests.testConnection',
'neo.tests.testHandler', 'neo.tests.testHandler',
'neo.tests.testNodes', 'neo.tests.testNodes',
...@@ -50,7 +49,6 @@ UNIT_TEST_MODULES = [ ...@@ -50,7 +49,6 @@ UNIT_TEST_MODULES = [
'neo.tests.testPT', 'neo.tests.testPT',
# master application # master application
'neo.tests.master.testClientHandler', 'neo.tests.master.testClientHandler',
'neo.tests.master.testElectionHandler',
'neo.tests.master.testMasterApp', 'neo.tests.master.testMasterApp',
'neo.tests.master.testMasterPT', 'neo.tests.master.testMasterPT',
'neo.tests.master.testRecovery', 'neo.tests.master.testRecovery',
......
...@@ -65,6 +65,5 @@ class IdentificationHandler(EventHandler): ...@@ -65,6 +65,5 @@ class IdentificationHandler(EventHandler):
node.setConnection(conn, app.uuid < uuid) node.setConnection(conn, app.uuid < uuid)
# accept the identification and trigger an event # accept the identification and trigger an event
conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and conn.answer(Packets.AcceptIdentification(NodeTypes.STORAGE, uuid and
app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid, app.uuid, app.pt.getPartitions(), app.pt.getReplicas(), uuid))
app.master_node.getAddress(), ()))
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
...@@ -309,10 +309,6 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -309,10 +309,6 @@ class NeoUnitTestBase(NeoTestBase):
""" Check if the ProtocolError exception was raised """ """ Check if the ProtocolError exception was raised """
self.assertRaises(protocol.ProtocolError, method, *args, **kwargs) self.assertRaises(protocol.ProtocolError, method, *args, **kwargs)
def checkNotReadyErrorRaised(self, method, *args, **kwargs):
""" Check if the NotReadyError exception was raised """
self.assertRaises(protocol.NotReadyError, method, *args, **kwargs)
def checkAborted(self, conn): def checkAborted(self, conn):
""" Ensure the connection was aborted """ """ Ensure the connection was aborted """
self.assertEqual(len(conn.mockGetNamedCalls('abort')), 1) self.assertEqual(len(conn.mockGetNamedCalls('abort')), 1)
...@@ -330,16 +326,6 @@ class NeoUnitTestBase(NeoTestBase): ...@@ -330,16 +326,6 @@ class NeoUnitTestBase(NeoTestBase):
self._checkNoPacketSend(conn, 'answer') self._checkNoPacketSend(conn, 'answer')
self._checkNoPacketSend(conn, 'ask') self._checkNoPacketSend(conn, 'ask')
def checkUUIDSet(self, conn, uuid=None, check_intermediate=True):
""" ensure UUID was set on the connection """
calls = conn.mockGetNamedCalls('setUUID')
found_uuid = calls.pop().getParam(0)
if check_intermediate:
for call in calls:
self.assertEqual(found_uuid, call.getParam(0))
if uuid is not None:
self.assertEqual(found_uuid, uuid)
# in check(Ask|Answer|Notify)Packet we return the packet so it can be used # in check(Ask|Answer|Notify)Packet we return the packet so it can be used
# in tests if more accurate checks are required # in tests if more accurate checks are required
......
...@@ -21,7 +21,6 @@ from .. import NeoUnitTestBase, buildUrlFromString ...@@ -21,7 +21,6 @@ from .. import NeoUnitTestBase, buildUrlFromString
from neo.client.app import Application from neo.client.app import Application
from neo.client.cache import test as testCache from neo.client.cache import test as testCache
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
from neo.lib.protocol import NodeTypes, UUID_NAMESPACES
class ClientApplicationTests(NeoUnitTestBase): class ClientApplicationTests(NeoUnitTestBase):
...@@ -97,63 +96,6 @@ class ClientApplicationTests(NeoUnitTestBase): ...@@ -97,63 +96,6 @@ class ClientApplicationTests(NeoUnitTestBase):
# no packet sent # no packet sent
self.checkNoPacketSent(app.master_conn) self.checkNoPacketSent(app.master_conn)
def test_connectToPrimaryNode(self):
# here we have three master nodes :
# the connection to the first will fail
# the second will have changed
# the third will not be ready
# after the third, the partition table will be operational
# (as if it was connected to the primary master node)
# will raise IndexError at the third iteration
app = self.getApp('127.0.0.1:10010 127.0.0.1:10011')
# TODO: test more connection failure cases
# askLastTransaction
def _ask8(_):
pass
# Sixth packet : askPartitionTable succeeded
def _ask7(_):
app.pt = Mock({'operational': True})
# fifth packet : request node identification succeeded
def _ask6(conn):
app.master_conn = conn
app.uuid = 1 + (UUID_NAMESPACES[NodeTypes.CLIENT] << 24)
app.trying_master_node = app.primary_master_node = Mock({
'getAddress': ('127.0.0.1', 10011),
'__str__': 'Fake master node',
})
# third iteration : node not ready
def _ask4(_):
app.trying_master_node = None
# second iteration : master node changed
def _ask3(_):
app.primary_master_node = Mock({
'getAddress': ('127.0.0.1', 10010),
'__str__': 'Fake master node',
})
# first iteration : connection failed
def _ask2(_):
app.trying_master_node = None
# do nothing for the first call
# Case of an unknown primary_uuid (XXX: handler should probably raise,
# it's not normal for a node to inform of a primary uuid without
# telling us what its address is.)
def _ask1(_):
pass
ask_func_list = [_ask1, _ask2, _ask3, _ask4, _ask6, _ask7, _ask8]
def _ask_base(conn, _, handler=None):
ask_func_list.pop(0)(conn)
app.nm.getByAddress(conn.getAddress())._connection = None
app._ask = _ask_base
# fake environment
app.em.close()
app.em = Mock({'getConnectionList': []})
app.pt = Mock({ 'operational': False})
app.start = lambda: None
app.master_conn = app._connectToPrimaryNode()
self.assertFalse(ask_func_list)
self.assertTrue(app.master_conn is not None)
self.assertTrue(app.pt.operational())
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -77,12 +77,13 @@ class MasterTests(NEOFunctionalTest): ...@@ -77,12 +77,13 @@ class MasterTests(NEOFunctionalTest):
killed_uuid_list = self.neo.killSecondaryMaster() killed_uuid_list = self.neo.killSecondaryMaster()
# Test sanity checks. # Test sanity checks.
self.assertEqual(len(killed_uuid_list), 1) self.assertEqual(len(killed_uuid_list), 1)
self.neo.expectMasterState(killed_uuid_list[0], None) self.neo.expectMasterState(killed_uuid_list[0],
self.assertEqual(len(self.neo.getMasterList()), 2) NodeStates.TEMPORARILY_DOWN)
self.assertEqual(len(self.neo.getMasterList()), MASTER_NODE_COUNT)
uuid, = self.neo.killPrimary() uuid, = self.neo.killPrimary()
# Check the state of the primary we just killed # Check the state of the primary we just killed
self.neo.expectMasterState(uuid, (None, NodeStates.UNKNOWN)) self.neo.expectMasterState(uuid, NodeStates.TEMPORARILY_DOWN)
# Check that a primary master arose. # Check that a primary master arose.
self.neo.expectPrimary(timeout=10) self.neo.expectPrimary(timeout=10)
# Check that the uuid really changed. # Check that the uuid really changed.
......
#
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from ..mock import Mock
from neo.lib import protocol
from .. import NeoUnitTestBase
from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.master.app import Application
from neo.master.handlers.election import ClientElectionHandler, \
ServerElectionHandler
from neo.lib.exception import ElectionFailure
from neo.lib.connection import ClientConnection
class MasterClientElectionTestBase(NeoUnitTestBase):
def setUp(self):
super(MasterClientElectionTestBase, self).setUp()
self._master_port = 3001
def identifyToMasterNode(self):
node = self.app.nm.createMaster(uuid=self.getMasterUUID())
node.setAddress((self.local_ip, self._master_port))
self._master_port += 1
conn = self.getFakeConnection(
uuid=node.getUUID(),
address=node.getAddress(),
)
node.setConnection(conn)
return (node, conn)
def checkAcceptIdentification(self, conn):
return self.checkAnswerPacket(conn, Packets.AcceptIdentification)
class MasterClientElectionTests(MasterClientElectionTestBase):
def setUp(self):
super(MasterClientElectionTests, self).setUp()
# create an application object
config = self.getMasterConfiguration(master_number=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.em = Mock()
self.app.uuid = self.getMasterUUID()
self.app.server = (self.local_ip, 10000)
self.app.name = 'NEOCLUSTER'
self.election = ClientElectionHandler(self.app)
self.app.unconnected_master_node_set = set()
self.app.negotiating_master_node_set = set()
def _checkUnconnected(self, node):
addr = node.getAddress()
self.assertFalse(addr in self.app.negotiating_master_node_set)
def test_connectionFailed(self):
node, conn = self.identifyToMasterNode()
self.assertTrue(node.isUnknown())
self._checkUnconnected(node)
self.election.connectionFailed(conn)
self._checkUnconnected(node)
self.assertTrue(node.isUnknown())
def test_connectionCompleted(self):
node, conn = self.identifyToMasterNode()
self.assertTrue(node.isUnknown())
self._checkUnconnected(node)
self.election.connectionCompleted(conn)
self._checkUnconnected(node)
self.assertTrue(node.isUnknown())
self.checkAskPacket(conn, Packets.RequestIdentification)
def _setNegociating(self, node):
self._checkUnconnected(node)
addr = node.getAddress()
self.app.negotiating_master_node_set.add(addr)
def test_connectionClosed(self):
node, conn = self.identifyToMasterNode()
self._setNegociating(node)
self.election.connectionClosed(conn)
self.assertTrue(node.isUnknown())
addr = node.getAddress()
self.assertFalse(addr in self.app.negotiating_master_node_set)
def test_acceptIdentification1(self):
""" A non-master node accept identification """
node, conn = self.identifyToMasterNode()
args = (node.getUUID(), 0, 10, self.app.uuid, None,
self._getMasterList())
self.election.acceptIdentification(conn,
NodeTypes.CLIENT, *args)
self.assertFalse(node in self.app.negotiating_master_node_set)
self.checkClosed(conn)
def test_acceptIdentificationDoesNotKnowPrimary(self):
master1, master1_conn = self.identifyToMasterNode()
master1_uuid = master1.getUUID()
self.election.acceptIdentification(
master1_conn,
NodeTypes.MASTER,
master1_uuid,
1,
0,
self.app.uuid,
None,
[(master1.getAddress(), master1_uuid)],
)
self.assertEqual(self.app.primary_master_node, None)
def test_acceptIdentificationKnowsPrimary(self):
master1, master1_conn = self.identifyToMasterNode()
master1_uuid = master1.getUUID()
primary1 = master1.getAddress()
self.election.acceptIdentification(
master1_conn,
NodeTypes.MASTER,
master1_uuid,
1,
0,
self.app.uuid,
primary1,
[(master1.getAddress(), master1_uuid)],
)
self.assertNotEqual(self.app.primary_master_node, None)
def test_acceptIdentificationMultiplePrimaries(self):
master1, master1_conn = self.identifyToMasterNode()
master2, master2_conn = self.identifyToMasterNode()
master3, _ = self.identifyToMasterNode()
master1_uuid = master1.getUUID()
master2_uuid = master2.getUUID()
master3_uuid = master3.getUUID()
primary1 = master1.getAddress()
primary3 = master3.getAddress()
master1_address = master1.getAddress()
master2_address = master2.getAddress()
master3_address = master3.getAddress()
self.election.acceptIdentification(
master1_conn,
NodeTypes.MASTER,
master1_uuid,
1,
0,
self.app.uuid,
primary1,
[(master1_address, master1_uuid)],
)
self.assertRaises(ElectionFailure, self.election.acceptIdentification,
master2_conn,
NodeTypes.MASTER,
master2_uuid,
1,
0,
self.app.uuid,
primary3,
[
(master1_address, master1_uuid),
(master2_address, master2_uuid),
(master3_address, master3_uuid),
],
)
def test_acceptIdentification3(self):
""" Identification accepted """
node, conn = self.identifyToMasterNode()
args = (node.getUUID(), 0, 10, self.app.uuid, None,
self._getMasterList())
self.election.acceptIdentification(conn, NodeTypes.MASTER, *args)
self.checkUUIDSet(conn, node.getUUID())
self.assertEqual(self.app.primary is False,
self.app.server < node.getAddress())
self.assertFalse(node in self.app.negotiating_master_node_set)
def _getMasterList(self, with_node=None):
master_list = self.app.nm.getMasterList()
return [(x.getAddress(), x.getUUID()) for x in master_list]
class MasterServerElectionTests(MasterClientElectionTestBase):
def setUp(self):
super(MasterServerElectionTests, self).setUp()
# create an application object
config = self.getMasterConfiguration(master_number=1)
self.app = Application(config)
self.app.em.close()
self.app.pt.clear()
self.app.name = 'NEOCLUSTER'
self.app.em = Mock()
self.election = ServerElectionHandler(self.app)
self.app.unconnected_master_node_set = set()
self.app.negotiating_master_node_set = set()
for node in self.app.nm.getMasterList():
node.setState(NodeStates.RUNNING)
# define some variable to simulate client and storage node
self.client_address = (self.local_ip, 1000)
self.storage_address = (self.local_ip, 2000)
self.master_address = (self.local_ip, 3000)
def test_requestIdentification1(self):
""" A non-master node request identification """
node, conn = self.identifyToMasterNode()
args = node.getUUID(), node.getAddress(), self.app.name, None
self.assertRaises(protocol.NotReadyError,
self.election.requestIdentification,
conn, NodeTypes.CLIENT, *args)
def test_requestIdentification4(self):
""" No conflict """
node, conn = self.identifyToMasterNode()
args = node.getUUID(), node.getAddress(), self.app.name, None
self.election.requestIdentification(conn,
NodeTypes.MASTER, *args)
self.checkUUIDSet(conn, node.getUUID())
(node_type, uuid, partitions, replicas, new_uuid, primary_uuid,
master_list) = self.checkAcceptIdentification(conn).decode()
self.assertEqual(node.getUUID(), new_uuid)
self.assertNotEqual(node.getUUID(), uuid)
def __getClient(self):
uuid = self.getClientUUID()
conn = self.getFakeConnection(uuid=uuid, address=self.client_address)
self.app.nm.createClient(uuid=uuid, address=self.client_address)
return conn
def testRequestIdentification1(self):
""" Check with a non-master node, must be refused """
conn = self.__getClient()
self.checkNotReadyErrorRaised(
self.election.requestIdentification,
conn,
NodeTypes.CLIENT,
conn.getUUID(),
conn.getAddress(),
self.app.name,
None,
)
def _requestIdentification(self):
conn = self.getFakeConnection()
peer_uuid = self.getMasterUUID()
address = (self.local_ip, 2001)
self.election.requestIdentification(
conn,
NodeTypes.MASTER,
peer_uuid,
address,
self.app.name,
None,
)
node_type, uuid, partitions, replicas, _peer_uuid, primary, \
master_list = self.checkAcceptIdentification(conn).decode()
self.assertEqual(node_type, NodeTypes.MASTER)
self.assertEqual(uuid, self.app.uuid)
self.assertEqual(partitions, self.app.pt.getPartitions())
self.assertEqual(replicas, self.app.pt.getReplicas())
self.assertTrue(address in [x[0] for x in master_list])
self.assertTrue(self.app.server in [x[0] for x in master_list])
self.assertEqual(peer_uuid, _peer_uuid)
return primary
def testRequestIdentificationDoesNotKnowPrimary(self):
self.app.primary = False
self.app.primary_master_node = None
self.assertEqual(self._requestIdentification(), None)
def testRequestIdentificationKnowsPrimary(self):
self.app.primary = False
primary = (self.local_ip, 3000)
self.app.primary_master_node = Mock({
'getAddress': primary,
})
self.assertEqual(self._requestIdentification(), primary)
def testRequestIdentificationIsPrimary(self):
self.app.primary = True
primary = self.app.server
self.app.primary_master_node = Mock({
'getAddress': primary,
})
self.assertEqual(self._requestIdentification(), primary)
def test_reelectPrimary(self):
node, conn = self.identifyToMasterNode()
self.assertRaises(ElectionFailure, self.election.reelectPrimary, conn)
if __name__ == '__main__':
unittest.main()
#
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from . import NeoUnitTestBase
from neo.storage.app import Application
from neo.lib.bootstrap import BootstrapManager
from neo.lib.protocol import NodeTypes, Packets
class BootstrapManagerTests(NeoUnitTestBase):
def setUp(self):
NeoUnitTestBase.setUp(self)
self.prepareDatabase(number=1)
# create an application object
config = self.getStorageConfiguration()
self.app = Application(config)
self.bootstrap = BootstrapManager(self.app, NodeTypes.STORAGE)
# define some variable to simulate client and storage node
self.master_port = 10010
self.storage_port = 10020
self.num_partitions = 1009
self.num_replicas = 2
def _tearDown(self, success):
self.app.close()
del self.app
super(BootstrapManagerTests, self)._tearDown(success)
# Tests
def testConnectionCompleted(self):
address = ("127.0.0.1", self.master_port)
conn = self.getFakeConnection(address=address)
self.bootstrap.current = self.app.nm.createMaster(address=address)
self.bootstrap.connectionCompleted(conn)
self.checkAskPacket(conn, Packets.RequestIdentification)
def testHandleNotReady(self):
# the primary is not ready
address = ("127.0.0.1", self.master_port)
conn = self.getFakeConnection(address=address)
self.bootstrap.current = self.app.nm.createMaster(address=address)
self.bootstrap.notReady(conn, '')
self.checkClosed(conn)
self.checkNoPacketSent(conn)
if __name__ == "__main__":
unittest.main()
...@@ -35,7 +35,7 @@ from neo.lib.connection import BaseConnection, \ ...@@ -35,7 +35,7 @@ from neo.lib.connection import BaseConnection, \
from neo.lib.connector import SocketConnector, ConnectorException from neo.lib.connector import SocketConnector, ConnectorException
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.locking import SimpleQueue from neo.lib.locking import SimpleQueue
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets from neo.lib.protocol import ClusterStates, Enum, NodeStates, NodeTypes, Packets
from neo.lib.util import cached_property, parseMasterList, p64 from neo.lib.util import cached_property, parseMasterList, p64
from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \ from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_SOCKET, DB_USER ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_SOCKET, DB_USER
...@@ -745,27 +745,39 @@ class NEOCluster(object): ...@@ -745,27 +745,39 @@ class NEOCluster(object):
def __exit__(self, t, v, tb): def __exit__(self, t, v, tb):
self.stop(None) self.stop(None)
def start(self, storage_list=None, fast_startup=False): def start(self, storage_list=None, master_list=None, recovering=False):
self.started = True self.started = True
self._patch() self._patch()
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL) self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
for node_type in 'master', 'admin': for node in self.master_list if master_list is None else master_list:
for node in getattr(self, node_type + '_list'): node.start()
for node in self.admin_list:
node.start() node.start()
Serialized.tic() Serialized.tic()
if fast_startup:
self.startCluster()
if storage_list is None: if storage_list is None:
storage_list = self.storage_list storage_list = self.storage_list
for node in storage_list: for node in storage_list:
node.start() node.start()
Serialized.tic() Serialized.tic()
if not fast_startup: if recovering:
expected_state = ClusterStates.RECOVERING
else:
self.startCluster() self.startCluster()
Serialized.tic() Serialized.tic()
expected_state = ClusterStates.RUNNING, ClusterStates.BACKINGUP
self.checkStarted(expected_state, storage_list)
def checkStarted(self, expected_state, storage_list=None):
if isinstance(expected_state, Enum.Item):
expected_state = expected_state,
state = self.neoctl.getClusterState() state = self.neoctl.getClusterState()
assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state assert state in expected_state, state
self.enableStorageList(storage_list) expected_state = (NodeStates.PENDING
if state == ClusterStates.RECOVERING
else NodeStates.RUNNING)
for node in self.storage_list if storage_list is None else storage_list:
state = self.getNodeState(node)
assert state == expected_state, (node, state)
def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw): def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw):
if self.started: if self.started:
......
...@@ -34,8 +34,8 @@ from neo.lib.connection import ConnectionClosed, \ ...@@ -34,8 +34,8 @@ from neo.lib.connection import ConnectionClosed, \
from neo.lib.exception import DatabaseFailure, StoppedOperation from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.handler import DelayEvent from neo.lib.handler import DelayEvent
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \ from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
Packet, uuid_str, ZERO_OID, ZERO_TID Packets, Packet, uuid_str, ZERO_OID, ZERO_TID)
from .. import expectedFailure, Patch, TransactionalResource from .. import expectedFailure, Patch, TransactionalResource
from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \ from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \
RandomConflictDict, ThreadId, with_cluster RandomConflictDict, ThreadId, with_cluster
...@@ -837,12 +837,6 @@ class Test(NEOThreadedTest): ...@@ -837,12 +837,6 @@ class Test(NEOThreadedTest):
@with_cluster(master_count=3, partitions=10, replicas=1, storage_count=3) @with_cluster(master_count=3, partitions=10, replicas=1, storage_count=3)
def testShutdown(self, cluster): def testShutdown(self, cluster):
# BUG: Due to bugs in election, master nodes sometimes crash, or they
# declare themselves primary too quickly, but issues seem to be
# only reproducible with SSL enabled.
self._testShutdown(cluster)
def _testShutdown(self, cluster):
def before_finish(_): def before_finish(_):
# tell admin to shutdown the cluster # tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING) cluster.neoctl.setClusterState(ClusterStates.STOPPING)
...@@ -1225,12 +1219,10 @@ class Test(NEOThreadedTest): ...@@ -1225,12 +1219,10 @@ class Test(NEOThreadedTest):
@with_cluster(start_cluster=0, storage_count=3, autostart=3) @with_cluster(start_cluster=0, storage_count=3, autostart=3)
def testAutostart(self, cluster): def testAutostart(self, cluster):
def startCluster(orig): cluster.start(cluster.storage_list[:2], recovering=True)
getClusterState = cluster.neoctl.getClusterState
self.assertEqual(ClusterStates.RECOVERING, getClusterState())
cluster.storage_list[2].start() cluster.storage_list[2].start()
with Patch(cluster, startCluster=startCluster): self.tic()
cluster.start(cluster.storage_list[:2]) cluster.checkStarted(ClusterStates.RUNNING)
@with_cluster(storage_count=2, partitions=2) @with_cluster(storage_count=2, partitions=2)
def testAbortVotedTransaction(self, cluster): def testAbortVotedTransaction(self, cluster):
...@@ -2219,6 +2211,60 @@ class Test(NEOThreadedTest): ...@@ -2219,6 +2211,60 @@ class Test(NEOThreadedTest):
def testConflictAfterDeadlockWithSlowReplica2(self): def testConflictAfterDeadlockWithSlowReplica2(self):
self.testConflictAfterDeadlockWithSlowReplica1(True) self.testConflictAfterDeadlockWithSlowReplica1(True)
@with_cluster(start_cluster=0, master_count=3)
def testElection(self, cluster):
m0, m1, m2 = cluster.master_list
cluster.start(master_list=(m0,), recovering=True)
getClusterState = cluster.neoctl.getClusterState
m0.em.removeReader(m0.listening_conn)
m1.start()
self.tic()
m2.start()
self.tic()
self.assertTrue(m0.primary)
self.assertTrue(m1.primary)
self.assertFalse(m2.primary)
m0.em.addReader(m0.listening_conn)
with ConnectionFilter() as f:
f.delayAcceptIdentification()
self.tic()
self.tic()
self.assertTrue(m0.primary)
self.assertFalse(m1.primary)
self.assertFalse(m2.primary)
self.assertEqual(getClusterState(), ClusterStates.RECOVERING)
cluster.startCluster()
def stop(node):
node.stop()
cluster.join((node,))
node.resetNode()
stop(m1)
self.tic()
self.assertEqual(getClusterState(), ClusterStates.RUNNING)
self.assertTrue(m0.primary)
self.assertFalse(m2.primary)
stop(m0)
self.tic()
self.assertEqual(getClusterState(), ClusterStates.RUNNING)
self.assertTrue(m2.primary)
# Check for proper update of node ids on first NotifyNodeInformation.
stop(m2)
m0.start()
def update(orig, app, timestamp, node_list):
orig(app, timestamp, sorted(node_list, reverse=1))
with Patch(cluster.storage.nm, update=update):
with ConnectionFilter() as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.RequestIdentification)
and packet.decode()[0] == NodeTypes.STORAGE)
self.tic()
m2.start()
self.tic()
self.tic()
self.assertEqual(getClusterState(), ClusterStates.RUNNING)
self.assertTrue(m0.primary)
self.assertFalse(m2.primary)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import unittest import unittest
from neo.lib.protocol import Packets from neo.lib.protocol import Packets
from .. import SSL from .. import SSL
from . import NEOCluster, with_cluster, test, testReplication from . import NEOCluster, test, testReplication
class SSLMixin: class SSLMixin:
...@@ -36,14 +36,6 @@ class SSLTests(SSLMixin, test.Test): ...@@ -36,14 +36,6 @@ class SSLTests(SSLMixin, test.Test):
testDeadlockAvoidance = None testDeadlockAvoidance = None
testUndoConflict = testUndoConflictDuringStore = None testUndoConflict = testUndoConflictDuringStore = None
if 1:
testShutdownWithSeveralMasterNodes = unittest.skip("fails randomly")(
test.Test.testShutdown.__func__)
@with_cluster(partitions=10, replicas=1, storage_count=3)
def testShutdown(self, cluster):
self._testShutdown(cluster)
def testAbortConnection(self, after_handshake=1): def testAbortConnection(self, after_handshake=1):
with self.getLoopbackConnection() as conn: with self.getLoopbackConnection() as conn:
conn.ask(Packets.Ping()) conn.ask(Packets.Ping())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment