Commit 2b321c02 authored by Julien Muchembled's avatar Julien Muchembled

WIP: Make admin node a web-app

The goal is to get rid off the neoctl command-line tool, and to manage the
cluster via a web browser, or tools like 'wget'. Then, it will be possible to
provide an web user interface to connect to the underlying DB of any storage
node, usually a SQL client.

The design of admin app is finished:
- it's threaded like clients
- it's a WSGI app

I also hacked a HTTP API as quickly as possible to make all tests pass.

TODO:
- define a better HTTP API
- there's no UI at all yet
- remove all unused packets from the protocol (those that were only used
  between neoctl and admin node)

There's currently no UI implemented.

There are a few dead files, not deleted yet, in case that they contain a few
pieces of useful code:
 neo/neoctl/app.py
 neo/neoctl/handler.py
 neo/scripts/neoctl.py
parent 9bd14bf2
...@@ -14,123 +14,202 @@ ...@@ -14,123 +14,202 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
import thread
import weakref
from bottle import Bottle, HTTPError, request, response
from copy import deepcopy
from logging import ERROR, INFO
from wsgiref.simple_server import WSGIRequestHandler
from . import handler
from neo.lib import logging from neo.lib import logging
from neo.lib.app import BaseApplication
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
from .handler import AdminEventHandler, MasterEventHandler, \
MasterRequestEventHandler
from neo.lib.bootstrap import BootstrapManager from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable from neo.lib.pt import PartitionTable
from neo.lib.protocol import ClusterStates, Errors, \ from neo.lib.protocol import ClusterStates, NodeTypes, NodeStates, Packets
NodeTypes, NodeStates, Packets from neo.lib.threaded_app import ThreadedApplication
from neo.lib.debug import register as registerLiveDebugger from neo.lib.util import p64, tidFromTime
class Application(BaseApplication): def raiseNotReady():
"""The storage node application.""" raise HTTPError(503, 'Not connected to a primary master')
def __init__(self, config):
super(Application, self).__init__(
config.getSSL(), config.getDynamicMasterList())
for address in config.getMasters():
self.nm.createMaster(address=address)
self.name = config.getCluster() class ObjectBottle(object):
self.server = config.getBind()
logging.debug('IP address is %s, port is %d', *self.server) def __init__(self, weakref=False, *args, **kw):
self._app = Bottle(*args, **kw)
self._weakref = weakref
# The partition table is initialized after getting the number of def __getattr__(self, name):
# partitions. return getattr(self._app, name)
self.pt = None
self.uuid = config.getUUID()
self.request_handler = MasterRequestEventHandler(self)
self.master_event_handler = MasterEventHandler(self)
self.cluster_state = None
self.reset()
registerLiveDebugger(on_log=self.log)
def close(self): def __get__(self, obj, cls):
self.listening_conn = None if obj is None: return self
super(Application, self).close() app = obj.bottle = deepcopy(self._app)
if self._weakref:
obj = weakref.ref(obj)
app.install(lambda f: lambda *a, **k: f(obj(), *a, **k))
else:
app.install(lambda f: lambda *a, **k: f(obj, *a, **k))
return app
def reset(self):
self.bootstrapped = False
self.master_conn = None
self.master_node = None
def log(self): class RequestHandler(WSGIRequestHandler):
self.em.log()
self.nm.log()
if self.pt is not None:
self.pt.log()
def run(self): def _log(self, level, format, *args):
try: logging.log(level, "%s %s", self.client_address[0], format % args)
self._run()
except Exception: def log_error(self, *args):
logging.exception('Pre-mortem data:') self._log(ERROR, *args)
self.log()
logging.flush() def log_message(self, *args):
raise self._log(INFO, *args)
class Application(ThreadedApplication):
"""The storage node application."""
bottle = ObjectBottle(weakref=True)
cluster_state = None
def __init__(self, config):
super(Application, self).__init__(
config.getMasters(), config.getCluster(),
ssl=config.getSSL(),
dynamic_master_list=config.getDynamicMasterList())
self.master_event_handler = handler.MasterEventHandler(self)
self.notifications_handler = handler.MasterNotificationsHandler(self)
self.primary_handler = handler.PrimaryAnswersHandler(self)
def _run(self): def _run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
if len(self.name) == 0: try:
raise RuntimeError, 'cluster name must be non-empty' poll = self.em.poll
while self.cluster_state != ClusterStates.STOPPING:
# Make a listening port. self.connectToPrimary()
handler = AdminEventHandler(self) try:
self.listening_conn = ListeningConnection(self, handler, self.server) while True:
poll(1)
while self.cluster_state != ClusterStates.STOPPING: except PrimaryFailure:
self.connectToPrimary() self.nm.log()
try: logging.error('primary master is down')
while True: finally:
self.em.poll(1) self.master_conn = None
except PrimaryFailure: while not self.em.isIdle():
logging.error('primary master is down') poll(1)
self.listening_conn.close() finally:
while not self.em.isIdle(): self.interrupt_main()
self.em.poll(1)
interrupt_main = staticmethod(thread.interrupt_main)
def connectToPrimary(self): def connectToPrimary(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
If a primary master node is not elected or ready, repeat If a primary master node is not elected or ready, repeat
the attempt of a connection periodically. the attempt of a connection periodically.
Note that I do not accept any connection from non-master nodes
at this stage.
""" """
self.master_node = None
self.uuid = None
self.cluster_state = None self.cluster_state = None
# search, find, connect and identify to the primary master # search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN, bootstrap = BootstrapManager(self, self.name, NodeTypes.ADMIN)
self.uuid, self.server) (self.master_node, self.master_conn, self.uuid,
data = bootstrap.getPrimaryConnection() num_partitions, num_replicas) = bootstrap.getPrimaryConnection()
(node, conn, uuid, num_partitions, num_replicas) = data self.pt = PartitionTable(num_partitions, num_replicas)
self.master_node = node
self.master_conn = conn
self.uuid = uuid
if self.pt is None:
self.pt = PartitionTable(num_partitions, num_replicas)
elif self.pt.getPartitions() != num_partitions:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of partitions is inconsistent')
elif self.pt.getReplicas() != num_replicas:
# XXX: shouldn't we recover instead of raising ?
raise RuntimeError('the number of replicas is inconsistent')
# passive handler
self.master_conn.setHandler(self.master_event_handler) self.master_conn.setHandler(self.master_event_handler)
self.master_conn.ask(Packets.AskClusterState()) self.master_conn.ask(Packets.AskClusterState())
self.master_conn.ask(Packets.AskNodeInformation()) self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable()) self.master_conn.ask(Packets.AskPartitionTable())
self.master_conn.setHandler(self.notifications_handler)
def sendPartitionTable(self, conn, min_offset, max_offset, uuid): self.master_conn.convertToMT(self.dispatcher)
# we have a pt
self.pt.log() def _askPrimary(self, packet, **kw):
""" Send a request to the primary master and process its answer """
return self._ask(self._getMasterConnection(), packet,
handler=self.primary_handler, **kw)
def _getMasterConnection(self):
conn = self.master_conn
if conn is None or conn.isClosed():
raiseNotReady()
return conn
def serve(self, **kw):
self.start()
self.bottle.run(server='wsgiref', handler_class=RequestHandler,
quiet=1, **kw)
def asTID(self, value):
if '.' in value:
return tidFromTime(float(value))
return p64(int(value, 0))
@bottle.route('/getClusterState')
def getClusterState(self):
if self.cluster_state is not None:
return str(self.cluster_state)
def _setClusterState(self, state):
self._askPrimary(Packets.SetClusterState(state))
@bottle.route('/setClusterState')
def setClusterState(self):
self._setClusterState(getattr(ClusterStates, request.query.state))
@bottle.route('/startCluster')
def startCluster(self):
self._setClusterState(ClusterStates.VERIFYING)
@bottle.route('/enableStorageList')
def enableStorageList(self):
node_list = request.query.node_list
self._askPrimary(Packets.AddPendingNodes(map(int,
request.query.node_list.split(',')) if node_list else ()))
@bottle.route('/tweakPartitionTable')
def tweakPartitionTable(self):
node_list = request.query.node_list
self._askPrimary(Packets.TweakPartitionTable(map(int,
request.query.node_list.split(',')) if node_list else ()))
@bottle.route('/getNodeList')
def getNodeList(self):
node_type = request.query.node_type
if node_type:
node_type = getattr(NodeTypes, node_type)
node_filter = lambda node: node.getType() is node_type
else:
node_filter = None
node_list = []
self._getMasterConnection()
for node in self.nm.getList(node_filter):
node_type, address, uuid, state = node = node.asTuple()
node_list.append((str(node_type), address, uuid, str(state)))
response.content_type = 'application/json'
return json.dumps(node_list)
@bottle.route('/getPrimary')
def getPrimary(self):
return str(getattr(self.master_node, 'getUUID', raiseNotReady)())
def _setNodeState(self, node, state):
self._askPrimary(Packets.SetNodeState(node, state))
@bottle.route('/killNode')
def killNode(self):
self._setNodeState(int(request.query.node), NodeStates.UNKNOWN)
@bottle.route('/dropNode')
def killNode(self):
self._setNodeState(int(request.query.node), NodeStates.DOWN)
@bottle.route('/getPartitionRowList')
def getPartitionRowList(self):
min_offset = int(request.query.min_offset)
max_offset = int(request.query.max_offset)
uuid = request.query.node
uuid = int(uuid) if uuid else None
row_list = [] row_list = []
if max_offset == 0: if max_offset == 0:
max_offset = self.pt.getPartitions() max_offset = self.pt.getPartitions()
...@@ -140,11 +219,30 @@ class Application(BaseApplication): ...@@ -140,11 +219,30 @@ class Application(BaseApplication):
try: try:
for cell in self.pt.getCellList(offset): for cell in self.pt.getCellList(offset):
if uuid is None or cell.getUUID() == uuid: if uuid is None or cell.getUUID() == uuid:
row.append((cell.getUUID(), cell.getState())) row.append((cell.getUUID(), str(cell.getState())))
except TypeError: except TypeError:
pass pass
row_list.append((offset, row)) row_list.append((offset, row))
except IndexError: except IndexError:
conn.notify(Errors.ProtocolError('invalid partition table offset')) raise HTTPError(400, 'invalid partition table offset')
else: response.content_type = 'application/json'
conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list)) return json.dumps((self.pt.getID(), row_list))
@bottle.route('/checkReplicas')
def checkReplicas(self):
partition_dict = {}
for partition in request.query.pt.split(','):
partition, source = partition.split(':')
source = int(source) if source else None
if partition:
partition_dict[int(partition)] = source
elif partition_dict:
raise HTTPError(400)
else:
self._getMasterConnection() # just for correct error handling
partition_dict = dict.fromkeys(xrange(self.pt.getPartitions()),
source)
max_tid = request.query.max_tid
self._askPrimary(Packets.CheckReplicas(partition_dict,
self.asTID(request.query.min_tid),
self.asTID(max_tid) if max_tid else None))
...@@ -14,73 +14,18 @@ ...@@ -14,73 +14,18 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging, protocol from bottle import HTTPError
from neo.lib.handler import EventHandler from neo.lib import logging
from neo.lib.protocol import uuid_str, Packets from neo.lib.handler import AnswerBaseHandler, EventHandler, MTEventHandler
from neo.lib.exception import PrimaryFailure from neo.lib.exception import PrimaryFailure
def check_primary_master(func):
def wrapper(self, *args, **kw):
if self.app.bootstrapped:
return func(self, *args, **kw)
raise protocol.NotReadyError('Not connected to a primary master.')
return wrapper
def forward_ask(klass):
return check_primary_master(lambda self, conn, *args, **kw:
self.app.master_conn.ask(klass(*args, **kw),
conn=conn, msg_id=conn.getPeerId()))
class AdminEventHandler(EventHandler):
"""This class deals with events for administrating cluster."""
@check_primary_master
def askPartitionList(self, conn, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s",
min_offset, max_offset, uuid_str(uuid))
self.app.sendPartitionTable(conn, min_offset, max_offset, uuid)
@check_primary_master
def askNodeList(self, conn, node_type):
if node_type is None:
node_type = 'all'
node_filter = None
else:
node_filter = lambda n: n.getType() is node_type
logging.info("ask list of %s nodes", node_type)
node_list = self.app.nm.getList(node_filter)
node_information_list = [node.asTuple() for node in node_list ]
p = Packets.AnswerNodeList(node_information_list)
conn.answer(p)
@check_primary_master
def askClusterState(self, conn):
conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@check_primary_master
def askPrimary(self, conn):
master_node = self.app.master_node
conn.answer(Packets.AnswerPrimary(master_node.getUUID()))
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
""" This class is just used to dispacth message to right handler"""
def _connectionLost(self, conn): def _connectionLost(self, conn):
app = self.app conn.cancelRequests("connection to master lost")
if app.listening_conn: # if running self.app.nm.getByUUID(conn.getUUID()).setUnknown()
assert app.master_conn in (conn, None) if self.app.master_conn is not None:
conn.cancelRequests("connection to master lost") assert self.app.master_conn is conn
app.reset()
app.uuid = None
raise PrimaryFailure raise PrimaryFailure
def connectionFailed(self, conn): def connectionFailed(self, conn):
...@@ -89,18 +34,6 @@ class MasterEventHandler(EventHandler): ...@@ -89,18 +34,6 @@ class MasterEventHandler(EventHandler):
def connectionClosed(self, conn): def connectionClosed(self, conn):
self._connectionLost(conn) self._connectionLost(conn)
def dispatch(self, conn, packet, kw={}):
if 'conn' in kw:
# expected answer
if packet.isResponse():
packet.setId(kw['msg_id'])
kw['conn'].answer(packet)
else:
self.app.request_handler.dispatch(conn, packet, kw)
else:
# unexpected answers and notifications
super(MasterEventHandler, self).dispatch(conn, packet, kw)
def answerClusterState(self, conn, state): def answerClusterState(self, conn, state):
self.app.cluster_state = state self.app.cluster_state = state
...@@ -109,23 +42,22 @@ class MasterEventHandler(EventHandler): ...@@ -109,23 +42,22 @@ class MasterEventHandler(EventHandler):
# implemented for factorize code (as done for bootstrap) # implemented for factorize code (as done for bootstrap)
logging.debug("answerNodeInformation") logging.debug("answerNodeInformation")
def notifyPartitionChanges(self, conn, ptid, cell_list): def notifyNodeInformation(self, conn, node_list):
self.app.pt.update(ptid, cell_list, self.app.nm) self.app.nm.update(node_list)
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
self.app.pt.load(ptid, row_list, self.app.nm) self.app.pt.load(ptid, row_list, self.app.nm)
self.app.bootstrapped = True
def sendPartitionTable(self, conn, ptid, row_list): class MasterNotificationsHandler(MasterEventHandler, MTEventHandler):
if self.app.bootstrapped:
self.app.pt.load(ptid, row_list, self.app.nm)
def notifyClusterInformation(self, conn, cluster_state): notifyClusterInformation = MasterEventHandler.answerClusterState.im_func
self.app.cluster_state = cluster_state sendPartitionTable = MasterEventHandler.answerPartitionTable.im_func
def notifyNodeInformation(self, conn, node_list): def notifyPartitionChanges(self, conn, ptid, cell_list):
self.app.nm.update(node_list) self.app.pt.update(ptid, cell_list, self.app.nm)
class MasterRequestEventHandler(EventHandler): class PrimaryAnswersHandler(AnswerBaseHandler):
""" This class handle all answer from primary master node""" """ This class handle all answer from primary master node"""
# XXX: to be deleted ?
def protocolError(self, conn, message):
raise HTTPError(400, message)
...@@ -592,6 +592,11 @@ class ClientConnection(Connection): ...@@ -592,6 +592,11 @@ class ClientConnection(Connection):
handler.connectionStarted(self) handler.connectionStarted(self)
self._connect() self._connect()
def convertToMT(self, dispatcher):
assert self.__class__ is ClientConnection, self
self.__class__ = MTClientConnection
self._initMT(dispatcher)
def _connect(self): def _connect(self):
try: try:
connected = self.connector.makeClientConnection() connected = self.connector.makeClientConnection()
...@@ -688,11 +693,14 @@ class MTClientConnection(ClientConnection): ...@@ -688,11 +693,14 @@ class MTClientConnection(ClientConnection):
return wrapper return wrapper
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.lock = lock = RLock() self._initMT(kwargs.pop('dispatcher'))
self.dispatcher = kwargs.pop('dispatcher') with self.lock:
with lock:
super(MTClientConnection, self).__init__(*args, **kwargs) super(MTClientConnection, self).__init__(*args, **kwargs)
def _initMT(self, dispatcher):
self.lock = RLock()
self.dispatcher = dispatcher
def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None, def ask(self, packet, timeout=CRITICAL_TIMEOUT, on_timeout=None,
queue=None, **kw): queue=None, **kw):
with self.lock: with self.lock:
......
...@@ -135,10 +135,16 @@ class ThreadedApplication(BaseApplication): ...@@ -135,10 +135,16 @@ class ThreadedApplication(BaseApplication):
handler.dispatch(conn, packet, kw) handler.dispatch(conn, packet, kw)
def _ask(self, conn, packet, handler=None, **kw): def _ask(self, conn, packet, handler=None, **kw):
self.setHandlerData(None) # The following line is more than optimization. If an admin node sends
queue = self._thread_container.queue # a packet that causes the master to disconnect (e.g. stop a cluster),
msg_id = conn.ask(packet, queue=queue, **kw) # we want at least to return the answer for this request, even if the
get = queue.get # polling thread already exited and cleared self.__dict__: returning
# the result of getHandlerData() would raise an AttributeError.
# This is tested by testShutdown (neo.tests.threaded.test.Test).
thread_container = self._thread_container
thread_container.answer = None
msg_id = conn.ask(packet, queue=thread_container.queue, **kw)
get = thread_container.queue.get
_handlePacket = self._handlePacket _handlePacket = self._handlePacket
while True: while True:
qconn, qpacket, kw = get(True) qconn, qpacket, kw = get(True)
...@@ -152,7 +158,6 @@ class ThreadedApplication(BaseApplication): ...@@ -152,7 +158,6 @@ class ThreadedApplication(BaseApplication):
raise ValueError, 'ForgottenPacket for an ' \ raise ValueError, 'ForgottenPacket for an ' \
'explicitely expected packet.' 'explicitely expected packet.'
_handlePacket(qconn, qpacket, kw, handler) _handlePacket(qconn, qpacket, kw, handler)
break return thread_container.answer # see above comment
if not is_forgotten and qpacket is not None: if not is_forgotten and qpacket is not None:
_handlePacket(qconn, qpacket, kw) _handlePacket(qconn, qpacket, kw)
return self.getHandlerData()
...@@ -14,157 +14,100 @@ ...@@ -14,157 +14,100 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib.app import BaseApplication import json, socket
from neo.lib.connection import ClientConnection from urllib import URLopener, urlencode
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets from neo.lib.protocol import CellStates, ClusterStates, NodeTypes, NodeStates, \
from .handler import CommandEventHandler ZERO_TID
from neo.lib.util import u64
class NotReadyException(Exception): class NotReadyException(Exception):
pass pass
class NeoCTL(BaseApplication): class NeoCTL(object):
connection = None def __init__(self, address):
connected = False host, port = address
if ":" in host:
def __init__(self, address, **kw): host = "[%s]" % host
super(NeoCTL, self).__init__(**kw) self.base_url = "http://%s:%s/" % (host, port)
self.server = self.nm.createAdmin(address=address) self._open = URLopener().open
self.handler = CommandEventHandler(self)
self.response_queue = [] def _ask(self, path, **kw):
if kw:
def __getConnection(self): path += "?" + urlencode(sorted(x for x in kw.iteritems()
if not self.connected: if '' is not x[1] is not None))
self.connection = ClientConnection(self, self.handler, self.server) try:
# Never delay reconnection to master. This speeds up unit tests return self._open(self.base_url + path).read()
# and it should not change anything for normal use. except IOError, e:
self.connection.setReconnectionNoDelay() e0 = e[0]
while not self.connected: if e0 == 'socket error' or e0 == 'http error' and e[1] == 503:
self.em.poll(1) raise NotReadyException
if self.connection is None: raise
raise NotReadyException('not connected')
return self.connection
def __ask(self, packet):
# TODO: make thread-safe
connection = self.__getConnection()
connection.ask(packet)
response_queue = self.response_queue
assert len(response_queue) == 0
while self.connected:
self.em.poll(1)
if response_queue:
break
else:
raise NotReadyException, 'Connection closed'
response = response_queue.pop()
if response[0] == Packets.Error and \
response[1] == ErrorCodes.NOT_READY:
raise NotReadyException(response[2])
return response
def enableStorageList(self, uuid_list): def enableStorageList(self, uuid_list):
""" """
Put all given storage nodes in "running" state. Put all given storage nodes in "running" state.
""" """
packet = Packets.AddPendingNodes(uuid_list) self._ask('enableStorageList', node_list=','.join(map(str, uuid_list)))
response = self.__ask(packet)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def tweakPartitionTable(self, uuid_list=()): def tweakPartitionTable(self, uuid_list=()):
response = self.__ask(Packets.TweakPartitionTable(uuid_list)) self._ask('tweakPartitionTable', node_list=','.join(map(str, uuid_list)))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def setClusterState(self, state): def setClusterState(self, state):
""" """
Set cluster state. Set cluster state.
""" """
packet = Packets.SetClusterState(state) self._ask('setClusterState', state=state)
response = self.__ask(packet)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def _setNodeState(self, node, state):
"""
Kill node, or remove it permanently
"""
response = self.__ask(Packets.SetNodeState(node, state))
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
def getClusterState(self): def getClusterState(self):
""" """
Get cluster state. Get cluster state.
""" """
packet = Packets.AskClusterState() state = self._ask('getClusterState')
response = self.__ask(packet) if state:
if response[0] != Packets.AnswerClusterState: return getattr(ClusterStates, state)
raise RuntimeError(response)
return response[1]
def getLastIds(self):
response = self.__ask(Packets.AskLastIDs())
if response[0] != Packets.AnswerLastIDs:
raise RuntimeError(response)
return response[1:]
def getLastTransaction(self):
response = self.__ask(Packets.AskLastTransaction())
if response[0] != Packets.AnswerLastTransaction:
raise RuntimeError(response)
return response[1]
def getNodeList(self, node_type=None): def getNodeList(self, node_type=None):
""" """
Get a list of nodes, filtering with given type. Get a list of nodes, filtering with given type.
""" """
packet = Packets.AskNodeList(node_type) node_list = json.loads(self._ask('getNodeList', node_type=node_type))
response = self.__ask(packet) return ((getattr(NodeTypes, node_type), address and tuple(address),
if response[0] != Packets.AnswerNodeList: uuid, getattr(NodeStates, state))
raise RuntimeError(response) for node_type, address, uuid, state in node_list)
return response[1] # node_list
def getPartitionRowList(self, min_offset=0, max_offset=0, node=None): def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
""" """
Get a list of partition rows, bounded by min & max and involving Get a list of partition rows, bounded by min & max and involving
given node. given node.
""" """
packet = Packets.AskPartitionList(min_offset, max_offset, node) ptid, row_list = json.loads(self._ask('getPartitionRowList',
response = self.__ask(packet) min_offset=min_offset, max_offset=max_offset, node=node))
if response[0] != Packets.AnswerPartitionList: return ptid, [(offset, [(node, getattr(CellStates, state))
raise RuntimeError(response) for node, state in row])
return response[1:3] # ptid, row_list for offset, row in row_list]
def startCluster(self): def startCluster(self):
""" """
Set cluster into "verifying" state. Set cluster into "verifying" state.
""" """
return self.setClusterState(ClusterStates.VERIFYING) self._ask('startCluster')
def killNode(self, node): def killNode(self, node):
return self._setNodeState(node, NodeStates.UNKNOWN) self._ask('killNode', node=node)
def dropNode(self, node): def dropNode(self, node):
return self._setNodeState(node, NodeStates.DOWN) self._ask('dropNode', node=node)
def getPrimary(self): def getPrimary(self):
""" """
Return the primary master UUID. Return the primary master UUID.
""" """
packet = Packets.AskPrimary() return int(self._ask('getPrimary'))
response = self.__ask(packet)
if response[0] != Packets.AnswerPrimary: def checkReplicas(self, partition_dict, min_tid=ZERO_TID, max_tid=None):
raise RuntimeError(response) kw = {'pt': ','.join('%s:%s' % (k, '' if v is None else v)
return response[1] for k, v in partition_dict.iteritems())}
if max_tid is not None:
def checkReplicas(self, *args): kw['max_tid'] = u64(max_tid)
response = self.__ask(Packets.CheckReplicas(*args)) self._ask('checkReplicas', min_tid=u64(min_tid), **kw)
if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
raise RuntimeError(response)
return response[2]
...@@ -29,6 +29,7 @@ defaults = dict( ...@@ -29,6 +29,7 @@ defaults = dict(
masters = '127.0.0.1:10000', masters = '127.0.0.1:10000',
) )
def main(args=None): def main(args=None):
# build configuration dict from command line options # build configuration dict from command line options
(options, args) = parser.parse_args(args=args) (options, args) = parser.parse_args(args=args)
...@@ -39,6 +40,5 @@ def main(args=None): ...@@ -39,6 +40,5 @@ def main(args=None):
# and then, load and run the application # and then, load and run the application
from neo.admin.app import Application from neo.admin.app import Application
app = Application(config) host, port = config.getBind()
app.run() Application(config).serve(host=host, port=port)
...@@ -363,7 +363,7 @@ class NEOCluster(object): ...@@ -363,7 +363,7 @@ class NEOCluster(object):
pending_count += 1 pending_count += 1
if pending_count == target[0]: if pending_count == target[0]:
neoctl.startCluster() neoctl.startCluster()
except (NotReadyException, RuntimeError): except (NotReadyException, IOError):
pass pass
if not pdb.wait(test, MAX_START_TIME): if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster') raise AssertionError('Timeout when starting cluster')
......
...@@ -47,7 +47,7 @@ class MasterTests(NEOFunctionalTest): ...@@ -47,7 +47,7 @@ class MasterTests(NEOFunctionalTest):
break break
neoctl.killNode(uuid) neoctl.killNode(uuid)
self.neo.expectDead(master) self.neo.expectDead(master)
self.assertRaises(RuntimeError, neoctl.killNode, primary_uuid) self.assertRaises(IOError, neoctl.killNode, primary_uuid)
def testStoppingPrimaryWithTwoSecondaries(self): def testStoppingPrimaryWithTwoSecondaries(self):
# Wait for masters to stabilize # Wait for masters to stabilize
......
...@@ -173,7 +173,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -173,7 +173,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectOudatedCells(2) self.neo.expectOudatedCells(2)
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
self.assertRaises(RuntimeError, self.neo.neoctl.killNode, self.assertRaises(IOError, self.neo.neoctl.killNode,
started[1].getUUID()) started[1].getUUID())
started[1].stop() started[1].stop()
# Cluster not operational anymore. Only cells of second storage that # Cluster not operational anymore. Only cells of second storage that
...@@ -324,7 +324,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -324,7 +324,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectStorageNotKnown(started[0]) self.neo.expectStorageNotKnown(started[0])
self.neo.expectAssignedCells(started[0], 0) self.neo.expectAssignedCells(started[0], 0)
self.neo.expectAssignedCells(started[1], 10) self.neo.expectAssignedCells(started[1], 10)
self.assertRaises(RuntimeError, self.neo.neoctl.dropNode, self.assertRaises(IOError, self.neo.neoctl.dropNode,
started[1].getUUID()) started[1].getUUID())
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
......
...@@ -21,11 +21,13 @@ import traceback ...@@ -21,11 +21,13 @@ import traceback
from collections import deque from collections import deque
from ConfigParser import SafeConfigParser from ConfigParser import SafeConfigParser
from contextlib import contextmanager from contextlib import contextmanager
from cStringIO import StringIO
from itertools import count from itertools import count
from functools import wraps from functools import wraps
from urllib import splitquery
from zlib import decompress from zlib import decompress
from mock import Mock from mock import Mock
import transaction, ZODB import bottle, transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app import neo.client.app, neo.neoctl.app
from neo.client import Storage from neo.client import Storage
...@@ -162,14 +164,13 @@ class Serialized(object): ...@@ -162,14 +164,13 @@ class Serialized(object):
next_lock.release() next_lock.release()
cls._sched_lock.acquire() cls._sched_lock.acquire()
def __init__(self, app, busy=True): def __init__(self, app):
self._epoll = app.em.epoll self._epoll = app.em.epoll
app.em.epoll = self app.em.epoll = self
# XXX: It may have been initialized before the SimpleQueue is patched. # XXX: It may have been initialized before the SimpleQueue is patched.
thread_container = getattr(app, '_thread_container', None) thread_container = getattr(app, '_thread_container', None)
thread_container is None or thread_container.__init__() thread_container is None or thread_container.__init__()
if busy: self._busy.add(self) # block tic until app waits for polling
self._busy.add(self) # block tic until app waits for polling
def __getattr__(self, attr): def __getattr__(self, attr):
if attr in ('close', 'modify', 'register', 'unregister'): if attr in ('close', 'modify', 'register', 'unregister'):
...@@ -207,20 +208,6 @@ class Serialized(object): ...@@ -207,20 +208,6 @@ class Serialized(object):
cls._epoll.unregister(fd) cls._epoll.unregister(fd)
self._release_next() self._release_next()
class TestSerialized(Serialized):
def __init__(*args):
Serialized.__init__(busy=False, *args)
def poll(self, timeout):
if timeout:
while 1:
r = self._epoll.poll(0)
if r:
return r
Serialized.tic(step=1)
return self._epoll.poll(timeout)
class Node(object): class Node(object):
...@@ -236,6 +223,24 @@ class Node(object): ...@@ -236,6 +223,24 @@ class Node(object):
def filterConnection(self, *peers): def filterConnection(self, *peers):
return ConnectionFilter(self.getConnectionList(*peers)) return ConnectionFilter(self.getConnectionList(*peers))
def _run(self):
try:
super(Node, self)._run()
finally:
self._afterRun()
self.em.epoll.exit()
def _afterRun(self):
logging.debug('stopping %r', self)
try:
self.listening_conn.close()
except AttributeError:
pass
def start(self):
isinstance(self.em.epoll, Serialized) or Serialized(self)
super(Node, self).start()
class ServerNode(Node): class ServerNode(Node):
_server_class_dict = {} _server_class_dict = {}
...@@ -299,32 +304,43 @@ class ServerNode(Node): ...@@ -299,32 +304,43 @@ class ServerNode(Node):
self.close() self.close()
self.__init__(**kw) self.__init__(**kw)
def start(self):
Serialized(self)
threading.Thread.start(self)
def run(self):
try:
super(ServerNode, self).run()
finally:
self._afterRun()
logging.debug('stopping %r', self)
self.em.epoll.exit()
def _afterRun(self):
try:
self.listening_conn.close()
except AttributeError:
pass
def getListeningAddress(self): def getListeningAddress(self):
try: try:
return self.listening_conn.getAddress() return self.listening_conn.getAddress()
except AttributeError: except AttributeError:
raise ConnectorException raise ConnectorException
class AdminApplication(ServerNode, neo.admin.app.Application): class ThreadedNode(Node):
pass
def __init__(self, *args, **kw):
self._on_thread_exit = kw.pop("on_thread_exit", None)
super(ThreadedNode, self).__init__(*args, **kw)
self.poll_thread.node_name = getattr(self, 'node_name', self.name)
def _afterRun(self):
x = self._on_thread_exit
if x is not None:
self._on_thread_exit = None
x()
class AdminApplication(ThreadedNode, ServerNode, neo.admin.app.Application):
def interrupt_main(self):
pass
def start(self):
super(AdminApplication, self).start()
return
host, port = BIND
l = threading.Lock()
l.acquire()
self.start = l.release
try:
self.run = lambda: self.serve(host=host, port=port)
threading.Thread.start(self)
l.acquire()
finally:
del self.start, self.run
class MasterApplication(ServerNode, neo.master.app.Application): class MasterApplication(ServerNode, neo.master.app.Application):
pass pass
...@@ -366,21 +382,7 @@ class StorageApplication(ServerNode, neo.storage.app.Application): ...@@ -366,21 +382,7 @@ class StorageApplication(ServerNode, neo.storage.app.Application):
(r,), = self.dm.query("SELECT COUNT(*) FROM " + table) (r,), = self.dm.query("SELECT COUNT(*) FROM " + table)
return r return r
class ClientApplication(Node, neo.client.app.Application): class ClientApplication(ThreadedNode, neo.client.app.Application):
def __init__(self, master_nodes, name, **kw):
super(ClientApplication, self).__init__(master_nodes, name, **kw)
self.poll_thread.node_name = name
def _run(self):
try:
super(ClientApplication, self)._run()
finally:
self.em.epoll.exit()
def start(self):
isinstance(self.em.epoll, Serialized) or Serialized(self)
super(ClientApplication, self).start()
def getConnectionList(self, *peers): def getConnectionList(self, *peers):
for peer in peers: for peer in peers:
...@@ -392,10 +394,25 @@ class ClientApplication(Node, neo.client.app.Application): ...@@ -392,10 +394,25 @@ class ClientApplication(Node, neo.client.app.Application):
yield conn yield conn
class NeoCTL(neo.neoctl.app.NeoCTL): class NeoCTL(neo.neoctl.app.NeoCTL):
# Bypass HTTP layer
def __init__(self, *args, **kw): base_url = '/'
super(NeoCTL, self).__init__(*args, **kw)
TestSerialized(self) def __init__(self, cluster):
self._cluster = cluster
def _open(self, path):
environ = {'REQUEST_METHOD': 'GET'}
environ['PATH_INFO'], environ['QUERY_STRING'] = splitquery(path)
bottle.request.bind(environ)
route, args = self._cluster.admin.bottle.match(environ)
try:
result = route.call(*args)
if isinstance(result, Exception):
raise result
except bottle.HTTPError, e:
raise IOError('http error', e.status_code)
return StringIO(result or '')
class LoggerThreadName(str): class LoggerThreadName(str):
...@@ -502,6 +519,7 @@ class ConnectionFilter(object): ...@@ -502,6 +519,7 @@ class ConnectionFilter(object):
class NEOCluster(object): class NEOCluster(object):
admin = None
SSL = None SSL = None
def __init__(orig, self): # temporary definition for SimpleQueue patch def __init__(orig, self): # temporary definition for SimpleQueue patch
...@@ -564,9 +582,9 @@ class NEOCluster(object): ...@@ -564,9 +582,9 @@ class NEOCluster(object):
for _ in xrange(master_count)] for _ in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % x for x in master_list) self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
weak_self = weakref.proxy(self) weak_self = weakref.proxy(self)
kw = dict(cluster=weak_self, getReplicas=replicas, getAdapter=adapter, self._server_kw = kw = dict(cluster=weak_self, getReplicas=replicas,
getPartitions=partitions, getReset=clear_databases, getAdapter=adapter, getPartitions=partitions,
getSSL=self.SSL) getReset=clear_databases, getSSL=self.SSL)
if upstream is not None: if upstream is not None:
self.upstream = weakref.proxy(upstream) self.upstream = weakref.proxy(upstream)
kw.update(getUpstreamCluster=upstream.name, kw.update(getUpstreamCluster=upstream.name,
...@@ -602,14 +620,13 @@ class NEOCluster(object): ...@@ -602,14 +620,13 @@ class NEOCluster(object):
kw["getAdapter"] = "Importer" kw["getAdapter"] = "Importer"
self.storage_list = [StorageApplication(getDatabase=db % x, **kw) self.storage_list = [StorageApplication(getDatabase=db % x, **kw)
for x in db_list] for x in db_list]
self.admin_list = [AdminApplication(**kw)] self.neoctl = NeoCTL(weak_self)
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
def __repr__(self): def __repr__(self):
return "<%s(%s) at 0x%x>" % (self.__class__.__name__, return "<%s(%s) at 0x%x>" % (self.__class__.__name__,
self.name, id(self)) self.name, id(self))
# A few shortcuts that work when there's only 1 master/storage/admin # A few shortcuts that work when there's only 1 master/storage
@property @property
def master(self): def master(self):
master, = self.master_list master, = self.master_list
...@@ -618,10 +635,6 @@ class NEOCluster(object): ...@@ -618,10 +635,6 @@ class NEOCluster(object):
def storage(self): def storage(self):
storage, = self.storage_list storage, = self.storage_list
return storage return storage
@property
def admin(self):
admin, = self.admin_list
return admin
### ###
@property @property
...@@ -630,20 +643,19 @@ class NEOCluster(object): ...@@ -630,20 +643,19 @@ class NEOCluster(object):
return master return master
def reset(self, clear_database=False): def reset(self, clear_database=False):
for node_type in 'master', 'storage', 'admin': for node in self.master_list:
kw = {} node.resetNode()
if node_type == 'storage': for node in self.storage_list:
kw['clear_database'] = clear_database node.resetNode(clear_database=clear_database)
for node in getattr(self, node_type + '_list'):
node.resetNode(**kw)
self.neoctl.close()
self.neoctl = NeoCTL(self.admin.getVirtualAddress(), ssl=self.SSL)
def start(self, storage_list=None, fast_startup=False): def start(self, storage_list=None, fast_startup=False):
self._patch() self._patch()
for node_type in 'master', 'admin': for node in self.master_list:
for node in getattr(self, node_type + '_list'): node.start()
node.start() self.admin = AdminApplication(
on_thread_exit=lambda: delattr(self, "admin"),
**self._server_kw)
self.admin.start()
Serialized.tic() Serialized.tic()
if fast_startup: if fast_startup:
self.startCluster() self.startCluster()
...@@ -659,20 +671,13 @@ class NEOCluster(object): ...@@ -659,20 +671,13 @@ class NEOCluster(object):
assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state assert state in (ClusterStates.RUNNING, ClusterStates.BACKINGUP), state
self.enableStorageList(storage_list) self.enableStorageList(storage_list)
def newClient(self):
return ClientApplication(name=self.name, master_nodes=self.master_nodes,
compress=self.compress, ssl=self.SSL)
@cached_property @cached_property
def client(self): def client(self):
client = self.newClient() return self.newClient(on_thread_exit=lambda: delattr(self, "client"))
# Make sure client won't be reused after it was closed.
def close(): def newClient(self, **kw):
client = self.client return ClientApplication(name=self.name, master_nodes=self.master_nodes,
del self.client, client.close compress=self.compress, ssl=self.SSL, **kw)
client.close()
client.close = close
return client
@cached_property @cached_property
def db(self): def db(self):
...@@ -681,7 +686,7 @@ class NEOCluster(object): ...@@ -681,7 +686,7 @@ class NEOCluster(object):
def startCluster(self): def startCluster(self):
try: try:
self.neoctl.startCluster() self.neoctl.startCluster()
except RuntimeError: except IOError:
Serialized.tic() Serialized.tic()
if self.neoctl.getClusterState() not in ( if self.neoctl.getClusterState() not in (
ClusterStates.BACKINGUP, ClusterStates.BACKINGUP,
...@@ -696,8 +701,13 @@ class NEOCluster(object): ...@@ -696,8 +701,13 @@ class NEOCluster(object):
for node in storage_list: for node in storage_list:
assert self.getNodeState(node) == NodeStates.RUNNING assert self.getNodeState(node) == NodeStates.RUNNING
def join(self, thread_list, timeout=5): def join(self, thread_list, threaded_node_list=(), timeout=5):
timeout += time.time() timeout += time.time()
for node in threaded_node_list:
try:
thread_list.append(node.poll_thread)
except AttributeError:
pass # node is None or thread is already stopped
while thread_list: while thread_list:
assert time.time() < timeout assert time.time() < timeout
Serialized.tic() Serialized.tic()
...@@ -705,16 +715,14 @@ class NEOCluster(object): ...@@ -705,16 +715,14 @@ class NEOCluster(object):
def stop(self): def stop(self):
logging.debug("stopping %s", self) logging.debug("stopping %s", self)
admin = self.admin
admin is None or admin.close()
client = self.__dict__.get("client") client = self.__dict__.get("client")
client is None or self.__dict__.pop("db", client).close() client is None or self.__dict__.pop("db", client).close()
node_list = self.admin_list + self.storage_list + self.master_list node_list = self.storage_list + self.master_list
for node in node_list: for node in node_list:
node.em.wakeup(True) node.em.wakeup(True)
try: self.join(node_list, (admin, client))
node_list.append(client.poll_thread)
except AttributeError: # client is None or thread is already stopped
pass
self.join(node_list)
logging.debug("stopped %s", self) logging.debug("stopped %s", self)
self._unpatch() self._unpatch()
...@@ -761,8 +769,7 @@ class NEOCluster(object): ...@@ -761,8 +769,7 @@ class NEOCluster(object):
def __del__(self, __print_exc=traceback.print_exc): def __del__(self, __print_exc=traceback.print_exc):
try: try:
self.neoctl.close() for node_type in 'storage', 'master':
for node_type in 'admin', 'storage', 'master':
for node in getattr(self, node_type + '_list'): for node in getattr(self, node_type + '_list'):
node.close() node.close()
except: except:
......
...@@ -534,9 +534,8 @@ class Test(NEOThreadedTest): ...@@ -534,9 +534,8 @@ class Test(NEOThreadedTest):
# tell admin to shutdown the cluster # tell admin to shutdown the cluster
cluster.neoctl.setClusterState(ClusterStates.STOPPING) cluster.neoctl.setClusterState(ClusterStates.STOPPING)
# all nodes except clients should exit # all nodes except clients should exit
cluster.join(cluster.master_list cluster.join(cluster.master_list + cluster.storage_list,
+ cluster.storage_list (cluster.admin,))
+ cluster.admin_list)
finally: finally:
cluster.stop() cluster.stop()
cluster.reset() # reopen DB to check partition tables cluster.reset() # reopen DB to check partition tables
......
...@@ -26,7 +26,7 @@ if not os.path.exists('mock.py'): ...@@ -26,7 +26,7 @@ if not os.path.exists('mock.py'):
zodb_require = ['ZODB3>=3.10', 'ZODB3<3.11dev'] zodb_require = ['ZODB3>=3.10', 'ZODB3<3.11dev']
extras_require = { extras_require = {
'admin': [], 'admin': ['bottle'],
'client': zodb_require, 'client': zodb_require,
'ctl': [], 'ctl': [],
'master': [], 'master': [],
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment