handler.py 8.6 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2009-2010  Nexedi SA
Aurel's avatar
Aurel committed
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17

18
from neo import logging
19 20

from neo.handler import EventHandler
21
from neo import protocol
22
from neo.protocol import Packets, Errors
Aurel's avatar
Aurel committed
23
from neo.exception import PrimaryFailure
24 25
from neo.util import dump

26
class AdminEventHandler(EventHandler):
Aurel's avatar
Aurel committed
27 28
    """This class deals with events for administrating cluster."""

29
    def askPartitionList(self, conn, min_offset, max_offset, uuid):
30
        logging.info("ask partition list from %s to %s for %s" %
31
                (min_offset, max_offset, dump(uuid)))
Aurel's avatar
Aurel committed
32
        app = self.app
33
        # check we have one pt otherwise ask it to PMN
34
        if app.pt is None:
35
            if self.app.master_conn is None:
36 37
                raise protocol.NotReadyError('Not connected to a primary ' \
                        'master.')
38
            p = Packets.AskPartitionTable([])
39 40 41 42 43
            msg_id = self.app.master_conn.ask(p)
            app.dispatcher.register(msg_id, conn,
                                    {'min_offset' : min_offset,
                                     'max_offset' : max_offset,
                                     'uuid' : uuid,
44
                                     'msg_id' : conn.getPeerId()})
45
        else:
46
            app.sendPartitionTable(conn, min_offset, max_offset, uuid)
Aurel's avatar
Aurel committed
47

Aurel's avatar
Aurel committed
48

49
    def askNodeList(self, conn, node_type):
Aurel's avatar
Aurel committed
50 51
        logging.info("ask node list for %s" %(node_type))
        def node_filter(n):
52
            return n.getType() is node_type
53
        node_list = self.app.nm.getList(node_filter)
54
        node_information_list = [node.asTuple() for node in node_list ]
55
        p = Packets.AnswerNodeList(node_information_list)
56
        conn.answer(p)
Aurel's avatar
Aurel committed
57

58
    def setNodeState(self, conn, uuid, state, modify_partition_table):
Aurel's avatar
Aurel committed
59
        logging.info("set node state for %s-%s" %(dump(uuid), state))
60
        node = self.app.nm.getByUUID(uuid)
Aurel's avatar
Aurel committed
61
        if node is None:
62
            raise protocol.ProtocolError('invalid uuid')
63
        if node.getState() == state and modify_partition_table is False:
64
            # no change
65
            p = Errors.Ack('no change')
66
            conn.answer(p)
67
            return
68
        # forward to primary master node
69 70
        if self.app.master_conn is None:
            raise protocol.NotReadyError('Not connected to a primary master.')
71
        p = Packets.SetNodeState(uuid, state, modify_partition_table)
72
        msg_id = self.app.master_conn.ask(p)
73
        self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
Aurel's avatar
Aurel committed
74

75
    def setClusterState(self, conn, state):
76
        # forward to primary
77 78
        if self.app.master_conn is None:
            raise protocol.NotReadyError('Not connected to a primary master.')
79
        p = Packets.SetClusterState(state)
80
        msg_id = self.app.master_conn.ask(p)
81
        self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
Aurel's avatar
Aurel committed
82

83
    def addPendingNodes(self, conn, uuid_list):
84 85
        if self.app.master_conn is None:
            raise protocol.NotReadyError('Not connected to a primary master.')
86
        logging.info('Add nodes %s' % [dump(uuid) for uuid in uuid_list])
87
        # forward the request to primary
88
        msg_id = self.app.master_conn.ask(Packets.AddPendingNodes(uuid_list))
89
        self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
90

91
    def askClusterState(self, conn):
92
        if self.app.cluster_state is None:
93
            if self.app.master_conn is None:
94 95
                raise protocol.NotReadyError('Not connected to a primary ' \
                        'master.')
96
            # required it from PMN first
97
            msg_id = self.app.master_conn.ask(Packets.AskClusterState())
98
            self.app.dispatcher.register(msg_id, conn,
99
                    {'msg_id' : conn.getPeerId()})
100
        else:
101
            conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
102

103
    def askPrimary(self, conn):
104 105
        if self.app.master_conn is None:
            raise protocol.NotReadyError('Not connected to a primary master.')
106
        master_node = self.app.master_node
107
        conn.answer(Packets.AnswerPrimary(master_node.getUUID(), []))
108 109

class MasterEventHandler(EventHandler):
110
    """ This class is just used to dispacth message to right handler"""
Aurel's avatar
Aurel committed
111

112
    def _connectionLost(self, conn):
113
        app = self.app
114
        assert app.master_conn in (conn, None)
115 116 117
        app.master_conn = None
        app.master_node = None
        app.uuid = None
118
        raise PrimaryFailure
119 120 121

    def connectionFailed(self, conn):
        self._connectionLost(conn)
Aurel's avatar
Aurel committed
122

123
    def timeoutExpired(self, conn):
124
        self._connectionLost(conn)
Aurel's avatar
Aurel committed
125

126
    def connectionClosed(self, conn):
127
        self._connectionLost(conn)
128 129

    def peerBroken(self, conn):
130
        self._connectionLost(conn)
131

132
    def dispatch(self, conn, packet):
133 134
        if packet.isResponse() and \
           self.app.dispatcher.registered(packet.getId()):
135
            # expected answer
136 137
            self.app.request_handler.dispatch(conn, packet)
        else:
138 139
            # unexpectexd answers and notifications
            super(MasterEventHandler, self).dispatch(conn, packet)
140

141
    def answerNodeInformation(self, conn):
142 143
        # XXX: This will no more exists when the initialization module will be
        # implemented for factorize code (as done for bootstrap)
144
        logging.debug("answerNodeInformation")
145

146
    def answerPartitionTable(self, conn, ptid, row_list):
147 148
        # XXX: This will no more exists when the initialization module will be
        # implemented for factorize code (as done for bootstrap)
149
        logging.debug("answerPartitionTable")
150

151
    def notifyPartitionChanges(self, conn, ptid, cell_list):
152 153 154 155 156 157
        app = self.app
        if ptid < app.ptid:
            # Ignore this packet.
            return
        app.ptid = ptid
        app.pt.update(ptid, cell_list, app.nm)
158

159
    def sendPartitionTable(self, conn, ptid, row_list):
160 161 162 163
        uuid = conn.getUUID()
        app = self.app
        nm = app.nm
        pt = app.pt
164
        node = app.nm.getByUUID(uuid)
165 166 167 168 169
        if app.ptid != ptid:
            app.ptid = ptid
            pt.clear()
        for offset, row in row_list:
            for uuid, state in row:
170
                node = nm.getByUUID(uuid)
171
                if node is None:
172
                    nm.createStorage(uuid=uuid)
173 174
                pt.setCell(offset, node, state)
        pt.log()
175

176
    def notifyClusterInformation(self, conn, cluster_state):
177 178
        self.app.cluster_state = cluster_state

179
    def notifyNodeInformation(self, conn, node_list):
180 181 182 183 184 185
        app = self.app
        app.nm.update(node_list)
        if not app.pt.filled():
            # Re-ask partition table, in case node change filled it.
            # XXX: we should only ask it if received states indicates it is
            # possible (ignore TEMPORARILY_DOWN for example)
186
            conn.ask(Packets.AskPartitionTable([]))
187

188
class MasterRequestEventHandler(EventHandler):
189 190
    """ This class handle all answer from primary master node"""

191 192
    def __answerNeoCTL(self, conn, packet):
        msg_id = conn.getPeerId()
193
        client_conn, kw = self.app.dispatcher.pop(msg_id)
194
        client_conn.answer(packet)
195

196
    def answerClusterState(self, conn, state):
197
        logging.info("answerClusterState for a conn")
198
        self.app.cluster_state = state
199
        self.__answerNeoCTL(conn, Packets.AnswerClusterState(state))
200

201
    def answerNewNodes(self, conn, uuid_list):
202
        logging.info("answerNewNodes for a conn")
203
        self.__answerNeoCTL(conn, Packets.AnswerNewNodes(uuid_list))
Aurel's avatar
Aurel committed
204

205
    def answerPartitionTable(self, conn, ptid, row_list):
206
        logging.info("answerPartitionTable for a conn")
207
        client_conn, kw = self.app.dispatcher.pop(conn.getPeerId())
208
        # sent client the partition table
209
        self.app.sendPartitionTable(client_conn)
Aurel's avatar
Aurel committed
210

211 212
    def answerNodeState(self, conn, uuid, state):
        self.__answerNeoCTL(conn,
213
                            Packets.AnswerNodeState(uuid, state))
Aurel's avatar
Aurel committed
214

215
    def ack(self, conn, msg):
216
        self.__answerNeoCTL(conn, Errors.Ack(msg))
217

218
    def protocolError(self, conn, msg):
219
        self.__answerNeoCTL(conn, Errors.ProtocolError(msg))
Aurel's avatar
Aurel committed
220