verification.py 11.7 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18 19
import logging

20
from neo import protocol
21
from neo.storage.handler import StorageEventHandler
22 23
from neo.protocol import INVALID_OID, INVALID_TID, \
        RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
24 25
        MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
        Packet
Yoshinori Okuji's avatar
Yoshinori Okuji committed
26
from neo.util import dump
27
from neo.node import MasterNode, StorageNode, ClientNode
Yoshinori Okuji's avatar
Yoshinori Okuji committed
28
from neo.connection import ClientConnection
29
from neo.exception import PrimaryFailure, OperationFailure
30 31 32 33 34 35 36 37 38 39 40

class VerificationEventHandler(StorageEventHandler):
    """This class deals with events for a verification phase."""

    def connectionAccepted(self, conn, s, addr):
        """Called when a connection is accepted."""
        # I do not want to accept a connection at this phase, but
        # someone might mistake me as a master node.
        StorageEventHandler.connectionAccepted(self, conn, s, addr)

    def timeoutExpired(self, conn):
41
        if not conn.isServerConnection():
42 43 44 45 46 47 48
            # If a primary master node timeouts, I cannot continue.
            logging.critical('the primary master node times out')
            raise PrimaryFailure('the primary master node times out')

        StorageEventHandler.timeoutExpired(self, conn)

    def connectionClosed(self, conn):
49
        if not conn.isServerConnection():
50 51 52 53 54 55 56
            # If a primary master node closes, I cannot continue.
            logging.critical('the primary master node is dead')
            raise PrimaryFailure('the primary master node is dead')

        StorageEventHandler.connectionClosed(self, conn)

    def peerBroken(self, conn):
57
        if not conn.isServerConnection():
58 59 60 61 62 63 64 65
            # If a primary master node gets broken, I cannot continue.
            logging.critical('the primary master node is broken')
            raise PrimaryFailure('the primary master node is broken')

        StorageEventHandler.peerBroken(self, conn)

    def handleRequestNodeIdentification(self, conn, packet, node_type,
                                        uuid, ip_address, port, name):
66
        if not conn.isServerConnection():
67 68
            self.handleUnexpectedPacket(conn, packet)
        else:
69
            app = self.app
70 71
            if node_type != MASTER_NODE_TYPE:
                logging.info('reject a connection from a non-master')
72
                conn.addPacket(protocol.notReady(packet.getId(), 'retry later'))
73 74 75 76
                conn.abort()
                return
            if name != app.name:
                logging.error('reject an alien cluster')
77
                conn.addPacket(protocol.protocolError(packet.getId(),
78 79 80 81 82 83 84 85 86 87 88 89 90
                                                      'invalid cluster name'))
                conn.abort()
                return

            addr = (ip_address, port)
            node = app.nm.getNodeByServer(addr)
            if node is None:
                node = MasterNode(server = addr, uuid = uuid)
                app.nm.add(node)
            else:
                # If this node is broken, reject it.
                if node.getUUID() == uuid:
                    if node.getState() == BROKEN_STATE:
91
                        p = protocol.brokenNodeDisallowedError(packet.getId(), 'go away')
92 93 94 95 96 97 98 99
                        conn.addPacket(p)
                        conn.abort()
                        return

            # Trust the UUID sent by the peer.
            node.setUUID(uuid)
            conn.setUUID(uuid)

100
            p = protocol.acceptNodeIdentification(packet.getId(), STORAGE_NODE_TYPE,
Aurel's avatar
Aurel committed
101
                                       app.uuid, app.server[0], app.server[1],
102
                                       app.num_partitions, app.num_replicas,
103
                                       uuid)
104 105 106 107 108 109
            conn.addPacket(p)

            # Now the master node should know that I am not the right one.
            conn.abort()

    def handleAcceptNodeIdentification(self, conn, packet, node_type,
Yoshinori Okuji's avatar
Yoshinori Okuji committed
110
                                       uuid, ip_address, port,
111
                                       num_partitions, num_replicas, your_uuid):
112 113 114 115
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
                                  known_master_list):
116
        if not conn.isServerConnection():
117 118 119 120 121 122 123 124
            app = self.app
            if app.primary_master_node.getUUID() != primary_uuid:
                raise PrimaryFailure('the primary master node seems to have changed')
            # XXX is it better to deal with known_master_list here?
            # But a primary master node is supposed not to send any info
            # with this packet, so it would be useless.
        else:
            self.handleUnexpectedPacket(conn, packet)
125 126

    def handleAskLastIDs(self, conn, packet):
127
        if not conn.isServerConnection():
128
            app = self.app
129 130
            oid = app.dm.getLastOID() or INVALID_OID
            tid = app.dm.getLastTID() or INVALID_TID
131
            p = protocol.answerLastIDs(packet.getId(), oid, tid, app.ptid)
132 133 134 135 136
            conn.addPacket(p)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleAskPartitionTable(self, conn, packet, offset_list):
137
        if not conn.isServerConnection():
138 139 140 141 142
            app = self.app
            row_list = []
            try:
                for offset in offset_list:
                    row = []
143 144 145 146 147
                    try:
                        for cell in app.pt.getCellList(offset):
                            row.append((cell.getUUID(), cell.getState()))
                    except TypeError:
                        pass
148 149
                    row_list.append((offset, row))
            except IndexError:
150
                p = protocol.protocolError(packet.getId(), 
151 152 153 154
                                'invalid partition table offset')
                conn.addPacket(p)
                return

155
            p = protocol.answerPartitionTable(packet.getId(), app.ptid, row_list)
156 157 158 159 160 161 162
            conn.addPacket(p)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
        """A primary master node sends this packet to synchronize a partition
        table. Note that the message can be split into multiple packets."""
163
        if not conn.isServerConnection():
164 165 166 167 168 169 170 171 172 173 174 175
            app = self.app
            nm = app.nm
            pt = app.pt
            if app.ptid != ptid:
                app.ptid = ptid
                pt.clear()

            for offset, row in row_list:
                for uuid, state in row:
                    node = nm.getNodeByUUID(uuid)
                    if node is None:
                        node = StorageNode(uuid = uuid)
176
                        if uuid != app.uuid:
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
                            node.setState(TEMPORARILY_DOWN_STATE)
                        nm.add(node)

                    pt.setCell(offset, node, state)

            if pt.filled():
                # If the table is filled, I assume that the table is ready
                # to use. Thus install it into the database for persistency.
                cell_list = []
                for offset in xrange(app.num_partitions):
                    for cell in pt.getCellList(offset):
                        cell_list.append((offset, cell.getUUID(), 
                                          cell.getState()))
                app.dm.setPartitionTable(ptid, cell_list)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
        """This is very similar to Send Partition Table, except that
        the information is only about changes from the previous."""
197
        if not conn.isServerConnection():
198 199 200 201 202
            app = self.app
            nm = app.nm
            pt = app.pt
            if app.ptid >= ptid:
                # Ignore this packet.
203
                logging.info('ignoring older partition changes')
204 205 206 207 208 209 210 211
                return

            # First, change the table on memory.
            app.ptid = ptid
            for offset, uuid, state in cell_list:
                node = nm.getNodeByUUID(uuid)
                if node is None:
                    node = StorageNode(uuid = uuid)
Aurel's avatar
Aurel committed
212
                    if uuid != app.uuid:
213 214 215 216 217 218 219 220 221 222 223
                        node.setState(TEMPORARILY_DOWN_STATE)
                    nm.add(node)

                pt.setCell(offset, node, state)

            # Then, the database.
            app.dm.changePartitionTable(ptid, cell_list)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleStartOperation(self, conn, packet):
224
        if not conn.isServerConnection():
225 226 227 228 229
            self.app.operational = True
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleStopOperation(self, conn, packet):
230
        if not conn.isServerConnection():
231 232 233 234 235
            raise OperationFailure('operation stopped')
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleAskUnfinishedTransactions(self, conn, packet):
236
        if not conn.isServerConnection():
237 238
            app = self.app
            tid_list = app.dm.getUnfinishedTIDList()
239
            p = protocol.answerUnfinishedTransactions(packet.getId(), tid_list)
240 241 242 243 244 245
            conn.addPacket(p)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleAskTransactionInformation(self, conn, packet, tid):
        app = self.app
246
        if not conn.isServerConnection():
247 248 249 250 251 252 253 254
            # If this is from a primary master node, assume that it wants
            # to obtain information about the transaction, even if it has
            # not been finished.
            t = app.dm.getTransaction(tid, all = True)
        else:
            t = app.dm.getTransaction(tid)

        if t is None:
255
            p = protocol.tidNotFound(packet.getId(), '%s does not exist' % dump(tid))
256
        else:
257
            p = protocol.answerTransactionInformation(packet.getId(), tid, 
258
                                           t[1], t[2], t[3], t[0])
259 260 261
        conn.addPacket(p)

    def handleAskObjectPresent(self, conn, packet, oid, tid):
262
        if not conn.isServerConnection():
263 264
            app = self.app
            if app.dm.objectPresent(oid, tid):
265
                p = protocol.answerObjectPresent(packet.getId(), oid, tid)
266
            else:
267
                p = protocol.oidNotFound(packet.getId(), 
268 269 270 271 272 273
                              '%s:%s do not exist' % (dump(oid), dump(tid)))
            conn.addPacket(p)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleDeleteTransaction(self, conn, packet, tid):
274
        if not conn.isServerConnection():
275 276 277 278 279 280
            app = self.app
            app.dm.deleteTransaction(tid, all = True)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleCommitTransaction(self, conn, packet, tid):
281
        if not conn.isServerConnection():
282 283 284 285 286 287 288 289 290 291
            app = self.app
            app.dm.finishTransaction(tid)
        else:
            self.handleUnexpectedPacket(conn, packet)

    def handleLockInformation(self, conn, packet, tid):
        pass

    def handleUnlockInformation(self, conn, packet, tid):
        pass