neoctl.py 4.94 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#
# Copyright (C) 2006-2009  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18 19 20 21 22

from neo.connector import getConnectorHandler
from neo.connection import ClientConnection
from neo.event import EventManager
from neo.neoctl.handler import CommandEventHandler
from neo import protocol
23
from neo.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
24

25 26 27
class NotReadyException(Exception):
    pass

28 29 30 31 32 33 34 35 36 37 38 39 40
class NeoCTL(object):

    connection = None
    connected = False

    def __init__(self, ip, port, handler):
        self.connector_handler = getConnectorHandler(handler)
        self.server = (ip, port)
        self.em = EventManager()
        self.handler = CommandEventHandler(self)
        self.response_queue = []

    def __getConnection(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
41 42 43 44 45 46 47 48
        if not self.connected:
            self.connection = ClientConnection(
                self.em, self.handler, addr=self.server,
                connector_handler=self.connector_handler)
            while not self.connected and self.connection is not None:
                self.em.poll(0)
            if self.connection is None:
                raise NotReadyException
49 50 51 52 53 54 55 56 57 58 59
        return self.connection

    def __ask(self, packet):
        # TODO: make thread-safe
        connection = self.__getConnection()
        connection.ask(packet)
        response_queue = self.response_queue
        assert len(response_queue) == 0
        while len(response_queue) == 0:
            self.em.poll(0)
            if not self.connected:
60
                raise NotReadyException, 'Connection closed'
61
        response = response_queue.pop()
62
        if response[0] == Packets.Error and \
63
           response[1] == ErrorCodes.NOT_READY:
64 65
            raise NotReadyException(response[2])
        return response
66

67
    def enableStorageList(self, uuid_list):
68 69 70
        """
          Put all given storage nodes in "running" state.
        """
71
        packet = Packets.AddPendingNodes(uuid_list)
72
        response = self.__ask(packet)
73
        assert response[0] == Packets.Error
74
        assert response[1] == ErrorCodes.NO_ERROR
75 76 77 78 79

    def setClusterState(self, state):
        """
          Set cluster state.
        """
80
        packet = Packets.SetClusterState(state)
81
        response = self.__ask(packet)
82
        assert response[0] == Packets.Error
83
        assert response[1] == ErrorCodes.NO_ERROR
84 85 86 87 88 89 90 91 92

    def setNodeState(self, node, state, update_partition_table=False):
        """
          Set node state, and allow (or not) updating partition table.
        """
        if update_partition_table:
            update_partition_table = 1
        else:
            update_partition_table = 0
93
        packet = Packets.SetNodeState(node, state, update_partition_table)
94
        response = self.__ask(packet)
95
        assert response[0] == Packets.Error
96
        assert response[1] == ErrorCodes.NO_ERROR
97 98 99 100 101

    def getClusterState(self):
        """
          Get cluster state.
        """
102
        packet = Packets.AskClusterState()
103
        response = self.__ask(packet)
104
        assert response[0] == Packets.AnswerClusterState
105 106 107 108 109 110
        return response[1]

    def getNodeList(self, node_type=None):
        """
          Get a list of nodes, filtering with given type.
        """
111
        packet = Packets.AskNodeList(node_type)
112
        response = self.__ask(packet)
113
        assert response[0] == Packets.AnswerNodeList
114 115 116 117 118 119 120
        return response[1]

    def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
        """
          Get a list of partition rows, bounded by min & max and involving
          given node.
        """
121
        packet = Packets.AskPartitionList(min_offset, max_offset, node)
122
        response = self.__ask(packet)
123
        assert response[0] == Packets.AnswerPartitionList
124 125 126 127 128 129
        return (response[1], response[2])

    def startCluster(self):
        """
          Set cluster into "verifying" state.
        """
130
        self.setClusterState(ClusterStates.VERIFYING)
131 132 133 134 135

    def dropNode(self, node):
        """
          Set node into "down" state and remove it from partition table.
        """
136
        self.setNodeState(node, NodeStates.DOWN, update_partition_table=1)
137

138
    def getPrimary(self):
139 140 141
        """
          Return the primary master UUID.
        """
142
        packet = Packets.AskPrimary()
143
        response = self.__ask(packet)
144
        assert response[0] == Packets.AnswerPrimary
145 146
        return response[1]