bootstrap.py 10.8 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18 19
import logging

20
from neo import protocol
21 22 23 24
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
        MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo.node import MasterNode, StorageNode, ClientNode
Yoshinori Okuji's avatar
Yoshinori Okuji committed
25
from neo.connection import ClientConnection
26
from neo.protocol import Packet, UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
27
from neo.pt import PartitionTable
28
from neo.storage.verification import VerificationEventHandler
29
from neo.util import dump
30 31
from neo.handler import identification_required, restrict_node_types, \
        server_connection_required, client_connection_required
32 33 34 35 36 37 38 39 40 41

class BootstrapEventHandler(StorageEventHandler):
    """This class deals with events for a bootstrap phase."""

    def connectionCompleted(self, conn):
        app = self.app
        if app.trying_master_node is None:
            # Should not happen.
            raise RuntimeError('connection completed while not trying to connect')

42
        p = protocol.requestNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
43
                                    app.server[0], app.server[1], app.name)
44
        conn.ask(p)
45 46 47 48 49 50 51 52 53 54 55 56 57
        StorageEventHandler.connectionCompleted(self, conn)

    def connectionFailed(self, conn):
        app = self.app
        if app.trying_master_node is None:
            # Should not happen.
            raise RuntimeError('connection failed while not trying to connect')

        if app.trying_master_node is app.primary_master_node:
            # Tried to connect to a primary master node and failed.
            # So this would effectively mean that it is dead.
            app.primary_master_node = None

Yoshinori Okuji's avatar
Yoshinori Okuji committed
58 59
        app.trying_master_node = None

60 61 62 63 64 65 66 67 68
        StorageEventHandler.connectionFailed(self, conn)

    def connectionAccepted(self, conn, s, addr):
        """Called when a connection is accepted."""
        # I do not want to accept a connection at this phase, but
        # someone might mistake me as a master node.
        StorageEventHandler.connectionAccepted(self, conn, s, addr)

    def timeoutExpired(self, conn):
69
        if not conn.isServerConnection():
Yoshinori Okuji's avatar
Yoshinori Okuji committed
70
            app = self.app
71 72 73 74 75 76 77 78 79
            if app.trying_master_node is app.primary_master_node:
                # If a primary master node timeouts, I should not rely on it.
                app.primary_master_node = None

            app.trying_master_node = None

        StorageEventHandler.timeoutExpired(self, conn)

    def connectionClosed(self, conn):
80
        if not conn.isServerConnection():
Yoshinori Okuji's avatar
Yoshinori Okuji committed
81
            app = self.app
82 83 84 85 86 87 88 89 90
            if app.trying_master_node is app.primary_master_node:
                # If a primary master node closes, I should not rely on it.
                app.primary_master_node = None

            app.trying_master_node = None

        StorageEventHandler.connectionClosed(self, conn)

    def peerBroken(self, conn):
91
        if not conn.isServerConnection():
Yoshinori Okuji's avatar
Yoshinori Okuji committed
92
            app = self.app
93 94 95 96 97 98 99 100 101 102
            if app.trying_master_node is app.primary_master_node:
                # If a primary master node gets broken, I should not rely
                # on it.
                app.primary_master_node = None

            app.trying_master_node = None

        StorageEventHandler.peerBroken(self, conn)

    def handleNotReady(self, conn, packet, message):
103
        if not conn.isServerConnection():
Yoshinori Okuji's avatar
Yoshinori Okuji committed
104
            app = self.app
105 106 107 108 109
            if app.trying_master_node is not None:
                app.trying_master_node = None

        conn.close()

110
    @server_connection_required
111 112
    def handleRequestNodeIdentification(self, conn, packet, node_type,
                                        uuid, ip_address, port, name):
113 114 115
        app = self.app
        if node_type != MASTER_NODE_TYPE:
            logging.info('reject a connection from a non-master')
116
            raise protocol.NotReadyError
117 118 119
        if name != app.name:
            logging.error('reject an alien cluster')
            conn.answer(protocol.protocolError('invalid cluster name'), packet)
120
            conn.abort()
121
            return
122

123 124 125 126 127 128 129 130 131
        addr = (ip_address, port)
        node = app.nm.getNodeByServer(addr)
        if node is None:
            node = MasterNode(server = addr, uuid = uuid)
            app.nm.add(node)
        else:
            # If this node is broken, reject it.
            if node.getUUID() == uuid:
                if node.getState() == BROKEN_STATE:
132
                    raise protocol.BrokenNotDisallowedError
133 134 135 136 137 138 139 140 141 142 143 144 145

        # Trust the UUID sent by the peer.
        node.setUUID(uuid)
        conn.setUUID(uuid)

        p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid, 
                    app.server[0], app.server[1], 0, 0, uuid)
        conn.answer(p, packet)

        # Now the master node should know that I am not the right one.
        conn.abort()

    @client_connection_required
146
    def handleAcceptNodeIdentification(self, conn, packet, node_type,
Yoshinori Okuji's avatar
Yoshinori Okuji committed
147
                                       uuid, ip_address, port,
148
                                       num_partitions, num_replicas, your_uuid):
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
        app = self.app
        node = app.nm.getNodeByServer(conn.getAddress())
        if node_type != MASTER_NODE_TYPE:
            # The peer is not a master node!
            logging.error('%s:%d is not a master node', ip_address, port)
            app.nm.remove(node)
            conn.close()
            return
        if conn.getAddress() != (ip_address, port):
            # The server address is different! Then why was
            # the connection successful?
            logging.error('%s:%d is waiting for %s:%d',
                          conn.getAddress()[0], conn.getAddress()[1], 
                          ip_address, port)
            app.nm.remove(node)
            conn.close()
            return

        if app.num_partitions is None or app.num_replicas is None or \
               app.num_replicas != num_replicas:
            # changing number of replicas is not an issue
            app.num_partitions = num_partitions
            app.dm.setNumPartitions(app.num_partitions)
            app.num_replicas = num_replicas
            app.dm.setNumReplicas(app.num_replicas)
            app.pt = PartitionTable(num_partitions, num_replicas)
            app.loadPartitionTable()
            app.ptid = app.dm.getPTID()
        elif app.num_partitions != num_partitions:
            raise RuntimeError('the number of partitions is inconsistent')


        if your_uuid != INVALID_UUID and app.uuid != your_uuid:
            # got an uuid from the primary master
            app.uuid = your_uuid
            app.dm.setUUID(app.uuid)
            logging.info('Got a new UUID from master : %s' % dump(app.uuid))

        conn.setUUID(uuid)
        node.setUUID(uuid)

        # Ask a primary master.
        conn.ask(protocol.askPrimaryMaster())

    @client_connection_required
194 195
    def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
                                  known_master_list):
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
        app = self.app
        # Register new master nodes.
        for ip_address, port, uuid in known_master_list:
            addr = (ip_address, port)
            n = app.nm.getNodeByServer(addr)
            if n is None:
                n = MasterNode(server = addr)
                app.nm.add(n)

            if uuid != INVALID_UUID:
                # If I don't know the UUID yet, believe what the peer
                # told me at the moment.
                if n.getUUID() is None or n.getUUID() != uuid:
                    n.setUUID(uuid)

        if primary_uuid != INVALID_UUID:
            primary_node = app.nm.getNodeByUUID(primary_uuid)
            if primary_node is None:
                # I don't know such a node. Probably this information
                # is old. So ignore it.
                pass
217
            else:
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
                app.primary_master_node = primary_node
                if app.trying_master_node is primary_node:
                    # I am connected to the right one.
                    logging.info('connected to a primary master node')
                    # This is a workaround to prevent handling of
                    # packets for the verification phase.
                    handler = VerificationEventHandler(app)
                    conn.setHandler(handler)
                else:
                    app.trying_master_node = None
                    conn.close()
        else:
            if app.primary_master_node is not None:
                # The primary master node is not a primary master node
                # any longer.
                app.primary_master_node = None
234

235 236
            app.trying_master_node = None
            conn.close()
237 238

    def handleAskLastIDs(self, conn, packet):
239
        logging.warning('/!\ handleAskLastIDs')
240 241 242
        pass

    def handleAskPartitionTable(self, conn, packet, offset_list):
243
        logging.warning('/!\ handleAskPartitionTable')
244 245
        pass

246
    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
247
        logging.warning('/!\ handleSendPartitionTable')
248 249
        pass

250
    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
251
        logging.warning('/!\ handleNotifyPartitionChanges')
252 253 254
        pass

    def handleStartOperation(self, conn, packet):
255
        logging.warning('/!\ handleStartOperation')
256 257 258
        pass

    def handleStopOperation(self, conn, packet):
259
        logging.warning('/!\ handleStopOperation')
260 261 262
        pass

    def handleAskUnfinishedTransactions(self, conn, packet):
263
        logging.warning('/!\ handleAskUnfinishedTransactions')
264 265
        pass

266
    def handleAskTransactionInformation(self, conn, packet, tid):
267
        logging.warning('/!\ handleAskTransactionInformation')
268 269 270
        pass

    def handleAskObjectPresent(self, conn, packet, oid, tid):
271
        logging.warning('/!\ handleAskObjectPresent')
272 273 274
        pass

    def handleDeleteTransaction(self, conn, packet, tid):
275
        logging.warning('/!\ handleDeleteTransaction')
276 277 278
        pass

    def handleCommitTransaction(self, conn, packet, tid):
279
        logging.warning('/!\ handleCommitTransaction')
280 281 282
        pass

    def handleLockInformation(self, conn, packet, tid):
283
        logging.warning('/!\ handleLockInformation')
284 285 286
        pass

    def handleUnlockInformation(self, conn, packet, tid):
287
        logging.warning('/!\ handleUnlockInformation')
288
        pass