Commit 6be00df7 authored by Julien Muchembled's avatar Julien Muchembled

neoctl: change 'set node' command into 'kill', which kills a node safely

parent bbf16947
......@@ -69,9 +69,6 @@ RC - Review output of pylint (CODE)
should be made a singleton (saves the CPU time needed to instanciates all
the copies - often when a connection is established, saves the memory
used by each copy).
- Consider replace setNodeState admin packet by one per action, like
dropNode to reduce packet processing complexity and reduce bad actions
like set a node in TEMPORARILY_DOWN state.
- Review node notfications. Eg. A storage don't have to be notified of new
clients but only when one is lost.
- Review transactional isolation of various methods
......
......@@ -53,21 +53,6 @@ class AdminEventHandler(EventHandler):
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p)
@check_primary_master
def setNodeState(self, conn, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s", uuid_str(uuid), state)
node = self.app.nm.getByUUID(uuid)
if node is None:
raise protocol.ProtocolError('invalid uuid')
if node.getState() == state and modify_partition_table is False:
# no change
p = Errors.Ack('no change')
conn.answer(p)
return
# forward to primary master node
p = Packets.SetNodeState(uuid, state, modify_partition_table)
self.app.master_conn.ask(p, conn=conn, msg_id=conn.getPeerId())
@check_primary_master
def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
......@@ -80,6 +65,7 @@ class AdminEventHandler(EventHandler):
addPendingNodes = forward_ask(Packets.AddPendingNodes)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
......
......@@ -26,7 +26,7 @@ except ImportError:
pass
# The protocol version (major, minor).
PROTOCOL_VERSION = (12, 1)
PROTOCOL_VERSION = (13, 1)
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -1135,7 +1135,6 @@ class SetNodeState(Packet):
_fmt = PStruct('set_node_state',
PUUID('uuid'),
PFNodeState,
PBoolean('modify_partition_table'),
)
_answer = Error
......
......@@ -247,7 +247,7 @@ class Application(object):
# send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType(), [])
node_list = node_dict.get(node.getType())
if node_list and node.isRunning():
node.notify(Packets.NotifyNodeInformation(node_list))
......
......@@ -20,8 +20,8 @@ from . import MasterHandler
from ..app import StateChangedException
from neo.lib import logging
from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError
from neo.lib.protocol import Errors, uuid_str
from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
from neo.lib.util import dump
CLUSTER_STATE_WORKFLOW = {
......@@ -32,6 +32,10 @@ CLUSTER_STATE_WORKFLOW = {
ClusterStates.STOPPING_BACKUP: (ClusterStates.BACKINGUP,
ClusterStates.STARTING_BACKUP),
}
NODE_STATE_WORKFLOW = {
NodeTypes.MASTER: (NodeStates.UNKNOWN,),
NodeTypes.STORAGE: (NodeStates.UNKNOWN, NodeStates.DOWN),
}
class AdministrationHandler(MasterHandler):
"""This class deals with messages from the admin node only"""
......@@ -72,38 +76,24 @@ class AdministrationHandler(MasterHandler):
if state != app.cluster_state:
raise StateChangedException(state)
def setNodeState(self, conn, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s : %s",
uuid_str(uuid), state, modify_partition_table)
def setNodeState(self, conn, uuid, state):
logging.info("set node state for %s: %s", uuid_str(uuid), state)
app = self.app
node = app.nm.getByUUID(uuid)
if node is None:
raise ProtocolError('unknown node')
if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
raise ProtocolError('can not switch node to this state')
if uuid == app.uuid:
node.setState(state)
# get message for self
if state != NodeStates.RUNNING:
p = Errors.Ack('node state changed')
conn.answer(p)
app.shutdown()
raise ProtocolError('can not kill primary master node')
if node.getState() == state:
# no change, just notify admin node
p = Errors.Ack('node already in %s state' % state)
conn.answer(p)
return
if state == NodeStates.RUNNING:
# first make sure to have a connection to the node
if not node.isConnected():
raise ProtocolError('no connection to the node')
node.setState(state)
elif state == NodeStates.DOWN and node.isStorage():
state_changed = state != node.getState()
message = ('state changed' if state_changed else
'node already in %s state' % state)
if node.isStorage():
keep = state == NodeStates.UNKNOWN
try:
cell_list = app.pt.dropNodeList([node],
not modify_partition_table)
cell_list = app.pt.dropNodeList([node], keep)
except PartitionTableException, e:
raise ProtocolError(str(e))
node.setState(state)
......@@ -112,16 +102,22 @@ class AdministrationHandler(MasterHandler):
node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
# close to avoid handle the closure as a connection lost
node.getConnection().abort()
if not modify_partition_table:
if keep:
cell_list = app.pt.outdate()
elif cell_list:
message = 'node permanently removed'
app.broadcastPartitionChanges(cell_list)
else:
node.setState(state)
# /!\ send the node information *after* the partition table change
p = Errors.Ack('state changed')
conn.answer(p)
conn.answer(Errors.Ack(message))
if state_changed:
# notify node explicitly because broadcastNodesInformation()
# ignores non-running nodes
assert not node.isRunning()
if node.isConnected():
node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
app.broadcastNodesInformation([node])
def addPendingNodes(self, conn, uuid_list):
......
......@@ -14,10 +14,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
from . import MasterHandler
from neo.lib.handler import EventHandler
from neo.lib.exception import ElectionFailure, PrimaryFailure
from neo.lib.protocol import NodeTypes, Packets, uuid_str
from neo.lib.protocol import NodeStates, NodeTypes, Packets, uuid_str
from neo.lib import logging
class SecondaryMasterHandler(MasterHandler):
......@@ -72,7 +73,8 @@ class PrimaryHandler(EventHandler):
if node_type != NodeTypes.MASTER:
# No interest.
continue
if uuid == app.uuid and state == NodeStates.UNKNOWN:
sys.exit()
# Register new master nodes.
if app.server == addr:
# This is self.
......
......@@ -17,7 +17,7 @@
from operator import itemgetter
from .neoctl import NeoCTL, NotReadyException
from neo.lib.util import bin, p64
from neo.lib.protocol import uuid_str, ClusterStates, NodeStates, NodeTypes, \
from neo.lib.protocol import uuid_str, ClusterStates, NodeTypes, \
UUID_NAMESPACES, ZERO_TID
action_dict = {
......@@ -28,7 +28,6 @@ action_dict = {
'primary': 'getPrimary',
},
'set': {
'node': 'setNodeState',
'cluster': 'setClusterState',
},
'check': 'checkReplicas',
......@@ -36,6 +35,7 @@ action_dict = {
'add': 'enableStorageList',
'tweak': 'tweakPartitionTable',
'drop': 'dropNode',
'kill': 'killNode',
}
uuid_int = (lambda ns: lambda uuid:
......@@ -50,9 +50,6 @@ class TerminalNeoCTL(object):
self.neoctl.close()
# Utility methods (could be functions)
def asNodeState(self, value):
return getattr(NodeStates, value.upper())
def asNodeType(self, value):
return getattr(NodeTypes, value.upper())
......@@ -121,25 +118,6 @@ class TerminalNeoCTL(object):
assert len(params) == 0
return str(self.neoctl.getClusterState())
def setNodeState(self, params):
"""
Set node state, and allow (or not) updating partition table.
Parameters: node state [update]
node: node to modify
state: state to put the node in
update: disallow (0, default) or allow (other integer) partition
table to be updated
"""
assert len(params) in (2, 3)
node = self.asNode(params[0])
state = self.asNodeState(params[1])
if len(params) == 3:
update_partition_table = bool(int(params[2]))
else:
update_partition_table = False
return self.neoctl.setNodeState(node, state,
update_partition_table=update_partition_table)
def setClusterState(self, params):
"""
Set cluster state.
......@@ -181,16 +159,19 @@ class TerminalNeoCTL(object):
"""
return self.neoctl.tweakPartitionTable(map(self.asNode, params))
def killNode(self, params):
"""
Kill redundant nodes (either a storage or a secondary master).
Parameters: node
"""
return self.neoctl.killNode(self.asNode(*params))
def dropNode(self, params):
"""
Set node into DOWN state.
Remove storage node permanently.
Parameters: node
node: node the pu into DOWN state
Equivalent to:
set node state (node) DOWN
"""
assert len(params) == 1
return self.neoctl.dropNode(self.asNode(params[0]))
return self.neoctl.dropNode(self.asNode(*params))
def getPrimary(self, params):
"""
......
......@@ -100,16 +100,11 @@ class NeoCTL(object):
raise RuntimeError(response)
return response[2]
def setNodeState(self, node, state, update_partition_table=False):
def _setNodeState(self, node, state):
"""
Set node state, and allow (or not) updating partition table.
Kill node, or remove it permanently
"""
if update_partition_table:
update_partition_table = 1
else:
update_partition_table = 0
packet = Packets.SetNodeState(node, state, update_partition_table)
response = self.__ask(packet)
response = self.__ask(Packets.SetNodeState(node, state))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
......@@ -151,12 +146,11 @@ class NeoCTL(object):
"""
return self.setClusterState(ClusterStates.VERIFYING)
def killNode(self, node):
return self._setNodeState(node, NodeStates.UNKNOWN)
def dropNode(self, node):
"""
Set node into "down" state and remove it from partition table.
"""
return self.setNodeState(node, NodeStates.DOWN,
update_partition_table=1)
return self._setNodeState(node, NodeStates.DOWN)
def getPrimary(self):
"""
......
......@@ -44,7 +44,7 @@ class BaseMasterHandler(EventHandler):
# This is me, do what the master tell me
logging.info("I was told I'm %s", state)
if state in (NodeStates.DOWN, NodeStates.TEMPORARILY_DOWN,
NodeStates.BROKEN):
NodeStates.BROKEN, NodeStates.UNKNOWN):
erase = state == NodeStates.DOWN
self.app.shutdown(erase=erase)
elif state == NodeStates.HIDDEN:
......
......@@ -629,6 +629,12 @@ class NEOCluster(object):
return current_try, current_try
self.expectCondition(callback, *args, **kw)
def expectDead(self, process, *args, **kw):
def callback(last_try):
current_try = not process.isAlive()
return current_try, current_try
self.expectCondition(callback, *args, **kw)
def expectStorageNotKnown(self, process, *args, **kw):
# /!\ Not Known != Unknown
process_uuid = process.getUUID()
......
......@@ -41,12 +41,14 @@ class MasterTests(NEOFunctionalTest):
self.neo.expectAllMasters(MASTER_NODE_COUNT)
# Kill
killed_uuid_list = self.neo.killSecondaryMaster()
# Test sanity check.
self.assertEqual(len(killed_uuid_list), 1)
uuid = killed_uuid_list[0]
# Check node state has changed.
self.neo.expectMasterState(uuid, None)
primary_uuid = self.neoctl.getPrimary()
for master in self.neo.getMasterProcessList():
uuid = master.getUUID()
if uuid != primary_uuid:
break
self.neo.neoctl.killNode(uuid)
self.neo.expectDead(master)
self.assertRaises(RuntimeError, self.neo.neoctl.killNode, primary_uuid)
def testStoppingPrimaryWithTwoSecondaries(self):
# Wait for masters to stabilize
......
......@@ -179,13 +179,15 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectRunning(started[2])
self.neo.expectOudatedCells(number=0)
started[0].stop()
self.neo.neoctl.killNode(started[0].getUUID())
# Cluster still operational. All cells of first storage should be
# outdated.
self.neo.expectUnavailable(started[0])
self.neo.expectOudatedCells(2)
self.neo.expectClusterRunning()
self.assertRaises(RuntimeError, self.neo.neoctl.killNode,
started[1].getUUID())
started[1].stop()
# Cluster not operational anymore. Only cells of second storage that
# were shared with the third one should become outdated.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment