Commit a3b0d7a8 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add a bootstrap manager that manage the primary master lookup, connection and

identifycation. It implement a bootstrap handler, taken from the storage
aoplication. The connection process has changed to avoid any dead-time during
connection, except when no master at all are available, this allows a quicker
connection from storage to master.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@849 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b6d8438d
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from time import sleep
from neo.handler import EventHandler
from neo.node import MasterNode
from neo import protocol
from neo.pt import PartitionTable
from neo.util import dump
from neo.connection import ClientConnection
NO_SERVER = ('0.0.0.0', 0)
class BootstrapManager(EventHandler):
"""
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, name, uuid=protocol.INVALID_UUID, server=NO_SERVER):
EventHandler.__init__(self, app)
self.primary = None
self.server = server
self.uuid = uuid
self.name = name
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(protocol.askPrimaryMaster())
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
self.current = None
def connectionClosed(self, conn):
EventHandler.connectionClosed(self, conn)
self.current = None
def timeoutExpired(self, conn):
EventHandler.timeoutExpired(self, conn)
self.current = None
def peerBroken(self, conn):
EventHandler.peerBroken(self, conn)
self.current = None
def handleNotReady(self, conn, packet, message):
# master are still electing on of them
self.current = None
conn.close()
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
nm = self.app.nm
# Register new master nodes.
# TODO: this job should be done by the node manager
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
node = nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server=addr)
nm.add(node)
node.setUUID(uuid)
self.primary = nm.getNodeByUUID(primary_uuid)
if self.primary is None or self.current is not self.primary:
# three cases here:
# - something goes wrong (unknown UUID)
# - this master doesn't know who's the primary
# - got the primary's uuid, so cut here
self.current = None
conn.close()
return
logging.info('connected to a primary master node')
conn.ask(protocol.requestNodeIdentification(protocol.STORAGE_NODE_TYPE,
self.uuid, self.server[0], self.server[1], self.name))
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, num_replicas, your_uuid):
self.num_partitions = num_partitions
self.num_replicas = num_replicas
if self.uuid != your_uuid:
# got an uuid from the primary master
self.uuid = your_uuid
logging.info('Got a new UUID : %s' % dump(self.uuid))
conn.setUUID(uuid)
def getPrimaryConnection(self, connector_handler):
logging.info('connecting to a primary master node')
em, nm = self.app.em, self.app.nm
index = 0
self.current = nm.getMasterNodeList()[0]
conn = None
# retry until identified to the primary
while self.primary is None or conn.getUUID() != self.primary.getUUID():
if self.current is None:
# conn closed
conn = None
if self.current is None:
# select a master
master_list = nm.getMasterNodeList()
index = (index + 1) % len(master_list)
self.current = master_list[index]
if index == 0:
# tried all known masters, sleep a bit
sleep(1)
if conn is None:
# open the connection
addr = self.current.getServer()
conn = ClientConnection(em, self, addr, connector_handler)
# still processing
em.poll(1)
node = nm.getNodeByUUID(conn.getUUID())
return (node, conn, self.uuid, self.num_partitions, self.num_replicas)
......@@ -35,6 +35,7 @@ from neo.storage.replicator import Replicator
from neo.connector import getConnectorHandler
from neo.pt import PartitionTable
from neo.util import dump
from neo.bootstrap import BootstrapManager
class Application(object):
"""The storage node application."""
......@@ -142,12 +143,9 @@ class Application(object):
# start the operation. This cycle will be executed permentnly,
# until the user explicitly requests a shutdown.
while 1:
self.operational = False
# look for the primary master
self.connectToPrimaryMaster()
assert self.master_conn is not None
if self.uuid == INVALID_UUID:
raise RuntimeError, 'No UUID supplied from the primary master'
self.operational = False
try:
while 1:
try:
......@@ -175,63 +173,40 @@ class Application(object):
Note that I do not accept any connection from non-master nodes
at this stage."""
logging.info('connecting to a primary master node')
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
# table, because the table might be incomplete.
if self.pt is not None:
self.loadPartitionTable()
self.ptid = self.dm.getPTID()
# bootstrap handler, only for outgoing connections
handler = handlers.BootstrapHandler(self)
em = self.em
nm = self.nm
pt = self.pt
# First of all, make sure that I have no connection.
for conn in em.getConnectionList():
for conn in self.em.getConnectionList():
if not isinstance(conn, ListeningConnection):
conn.close()
index = 0
self.trying_master_node = None
t = 0
while 1:
em.poll(1)
if self.trying_master_node is None:
if t + 1 < time():
# Choose a master node to connect to.
if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node
else:
# Otherwise, check one by one.
master_list = nm.getMasterNodeList()
try:
self.trying_master_node = master_list[index]
except IndexError:
index = 0
self.trying_master_node = master_list[0]
index += 1
ClientConnection(em, handler, \
addr = self.trying_master_node.getServer(),
connector_handler = self.connector_handler)
t = time()
elif self.primary_master_node is self.trying_master_node:
# If I know which is a primary master node, check if
# I have a connection to it already.
for conn in em.getConnectionList():
if isinstance(conn, ClientConnection):
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node is self.primary_master_node:
# Yes, I have.
conn.setHandler(handlers.VerificationHandler(self))
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, self.name, self.uuid, self.server)
data = bootstrap.getPrimaryConnection(self.connector_handler)
(node, conn, uuid, num_partitions, num_replicas) = data
self.master_node = node
self.master_conn = conn
return
self.uuid = uuid
self.dm.setUUID(uuid)
# Reload a partition table from the database. This is necessary
# when a previous primary master died while sending a partition
# table, because the table might be incomplete.
if pt is not None:
self.loadPartitionTable()
self.ptid = self.dm.getPTID()
if num_partitions != pt.getPartitions():
raise RuntimeError('the number of partitions is inconsistent')
if pt is None or pt.getReplicas() != num_replicas:
# changing number of replicas is not an issue
self.num_partitions = num_partitions
self.num_replicas = num_replicas
self.dm.setNumPartitions(self.num_partitions)
self.dm.setNumReplicas(self.num_replicas)
self.pt = PartitionTable(num_partitions, num_replicas)
self.loadPartitionTable()
self.ptid = self.dm.getPTID()
def verifyData(self):
"""Verify data under the control by a primary master node.
......
......@@ -254,7 +254,6 @@ from neo.storage.handlers.identification import IdentificationHandler
from neo.storage.handlers.initialization import InitializationHandler
from neo.storage.handlers.verification import VerificationHandler
from neo.storage.handlers.replication import ReplicationHandler
from neo.storage.handlers.bootstrap import BootstrapHandler
from neo.storage.handlers.storage import StorageOperationHandler
from neo.storage.handlers.master import MasterOperationHandler
from neo.storage.handlers.client import ClientOperationHandler
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo.storage.handlers import BaseStorageHandler
from neo.protocol import INVALID_UUID, MASTER_NODE_TYPE, STORAGE_NODE_TYPE
from neo.node import MasterNode
from neo import protocol
from neo.pt import PartitionTable
from neo.util import dump
class BootstrapHandler(BaseStorageHandler):
"""This class deals with events for a bootstrap phase."""
def connectionCompleted(self, conn):
conn.ask(protocol.askPrimaryMaster())
BaseStorageHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# Tried to connect to a primary master node and failed.
# So this would effectively mean that it is dead.
app.primary_master_node = None
app.trying_master_node = None
BaseStorageHandler.connectionFailed(self, conn)
def timeoutExpired(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
BaseStorageHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node closes, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
BaseStorageHandler.connectionClosed(self, conn)
def peerBroken(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node gets broken, I should not rely
# on it.
app.primary_master_node = None
app.trying_master_node = None
BaseStorageHandler.peerBroken(self, conn)
def handleNotReady(self, conn, packet, message):
app = self.app
if app.trying_master_node is not None:
app.trying_master_node = None
conn.close()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
if node_type != MASTER_NODE_TYPE:
# The peer is not a master node!
logging.error('%s:%d is not a master node', ip_address, port)
app.nm.remove(node)
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
if app.num_partitions is None or app.num_replicas is None or \
app.num_replicas != num_replicas:
# changing number of replicas is not an issue
app.num_partitions = num_partitions
app.dm.setNumPartitions(app.num_partitions)
app.num_replicas = num_replicas
app.dm.setNumReplicas(app.num_replicas)
app.pt = PartitionTable(num_partitions, num_replicas)
app.loadPartitionTable()
app.ptid = app.dm.getPTID()
elif app.num_partitions != num_partitions:
raise RuntimeError('the number of partitions is inconsistent')
if your_uuid != INVALID_UUID and app.uuid != your_uuid:
# got an uuid from the primary master
app.uuid = your_uuid
app.dm.setUUID(app.uuid)
logging.info('Got a new UUID from master : %s' % dump(app.uuid))
conn.setUUID(uuid)
#node.setUUID(uuid)
# Node UUID was set in handleAnswerPrimaryMaster
assert node.getUUID() == uuid
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
closed = False
app = self.app
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
logging.warning('Unknown primary master UUID: %s. Ignoring.' % dump(primary_uuid))
else:
app.primary_master_node = primary_node
if app.trying_master_node is primary_node:
# I am connected to the right one.
logging.info('connected to a primary master node')
else:
app.trying_master_node = None
conn.close()
closed = True
else:
if app.primary_master_node is not None:
# The primary master node is not a primary master node
# any longer.
app.primary_master_node = None
app.trying_master_node = None
conn.close()
closed = True
if not closed:
p = protocol.requestNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
conn.ask(p)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment