handler.py 16.9 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from neo import logging
19
from neo import protocol
20
from neo.protocol import NodeStates, ErrorCodes, Packets
21
from neo.protocol import PacketMalformedError, UnexpectedPacketError, \
22
        BrokenNodeDisallowedError, NotReadyError, ProtocolError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23

24

Yoshinori Okuji's avatar
Yoshinori Okuji committed
25 26
class EventHandler(object):
    """This class handles events."""
27

28 29
    def __init__(self, app):
        self.app = app
30 31
        self.packet_dispatch_table = self.__initPacketDispatchTable()
        self.error_dispatch_table = self.__initErrorDispatchTable()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
32

33
    def _packetMalformed(self, conn, packet, message='', *args):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
34
        """Called when a packet is malformed."""
35
        args = (conn.getAddress()[0], conn.getAddress()[1], message)
36 37
        if packet is None:
            # if decoding fail, there's no packet instance 
38
            logging.error('malformed packet from %s:%d: %s', *args)
39
        else:
40
            logging.error('malformed packet %s from %s:%d: %s', packet.getType(), *args)
41
        response = protocol.protocolError(message)
42
        if packet is not None:
43
            conn.answer(response, packet.getId())
44 45
        else:
            conn.notify(response)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
46 47 48
        conn.abort()
        self.peerBroken(conn)

49
    def __unexpectedPacket(self, conn, packet, message=None):
50 51
        """Handle an unexpected packet."""
        if message is None:
52 53
            message = 'unexpected packet type %s in %s' % (packet.getType(),
                    self.__class__.__name__)
54
        else:
55 56
            message = 'unexpected packet: %s in %s' % (message,
                    self.__class__.__name__)
57
        logging.error(message)
58
        conn.answer(protocol.protocolError(message), packet.getId())
59 60
        conn.abort()
        self.peerBroken(conn)
61

Yoshinori Okuji's avatar
Yoshinori Okuji committed
62 63 64
    def dispatch(self, conn, packet):
        """This is a helper method to handle various packet types."""
        try:
65
            try:
66
                method = self.packet_dispatch_table[packet.getType()]
67 68
            except KeyError:
                raise UnexpectedPacketError('no handler found')
69
            args = packet.decode() or ()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
70
            method(conn, packet, *args)
71
        except UnexpectedPacketError, e:
72
            self.__unexpectedPacket(conn, packet, *e.args)
73
        except PacketMalformedError, e:
74
            self._packetMalformed(conn, packet, *e.args)
75 76 77 78
        except BrokenNodeDisallowedError:
            answer_packet = protocol.brokenNodeDisallowedError('go away')
            conn.answer(answer_packet, packet.getId())
            conn.abort()
79 80 81 82 83
        except NotReadyError, message:
            if not message.args:
                message = 'Retry Later'
            message = str(message)
            conn.answer(protocol.notReady(message), packet.getId())
84 85
            conn.abort()
        except ProtocolError, message:
86
            message = str(message)
87 88
            conn.answer(protocol.protocolError(message), packet.getId())
            conn.abort()
89

90 91 92 93 94 95
    def checkClusterName(self, name):
        # raise an exception if the fiven name mismatch the current cluster name
        if self.app.name != name:
            logging.error('reject an alien cluster')
            raise protocol.ProtocolError('invalid cluster name')

Yoshinori Okuji's avatar
Yoshinori Okuji committed
96

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
    # Network level handlers

    def packetReceived(self, conn, packet):
        """Called when a packet is received."""
        self.dispatch(conn, packet)

    def connectionStarted(self, conn):
        """Called when a connection is started."""
        logging.debug('connection started for %s:%d', *(conn.getAddress()))

    def connectionCompleted(self, conn):
        """Called when a connection is completed."""
        logging.debug('connection completed for %s:%d', *(conn.getAddress()))

    def connectionFailed(self, conn):
        """Called when a connection failed."""
        logging.debug('connection failed for %s:%d', *(conn.getAddress()))

115
    def connectionAccepted(self, conn):
116 117
        """Called when a connection is accepted."""
        # A request for a node identification should arrive.
118
        conn.expectMessage(timeout = 10, additional_timeout = 0)
119 120 121 122

    def timeoutExpired(self, conn):
        """Called when a timeout event occurs."""
        logging.debug('timeout expired for %s:%d', *(conn.getAddress()))
123
        self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
124 125 126 127

    def connectionClosed(self, conn):
        """Called when a connection is closed by the peer."""
        logging.debug('connection closed for %s:%d', *(conn.getAddress()))
128
        self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
129 130 131 132

    def peerBroken(self, conn):
        """Called when a peer is broken."""
        logging.error('%s:%d is broken', *(conn.getAddress()))
133
        self.connectionLost(conn, NodeStates.BROKEN)
134

135
    def connectionLost(self, conn, new_state):
136 137 138 139 140 141
        """ this is a method to override in sub-handlers when there is no need
        to make distinction from the kind event that closed the connection  """
        pass


    # Packet handlers.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
142

143
    def requestIdentification(self, conn, packet, node_type,
144
                                        uuid, address, name):
145
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
146

147
    def acceptIdentification(self, conn, packet, node_type,
148
                       uuid, address, num_partitions, num_replicas, your_uuid):
149
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
150

151
    def askPrimary(self, conn, packet):
152
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
153

154
    def answerPrimary(self, conn, packet, primary_uuid,
155
                                  known_master_list):
156
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
157

158
    def announcePrimary(self, conn, packet):
159
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
160

161
    def reelectPrimary(self, conn, packet):
162
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
163

164
    def notifyNodeInformation(self, conn, packet, node_list):
165
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
166

167
    def askLastIDs(self, conn, packet):
168
        raise UnexpectedPacketError
169

170
    def answerLastIDs(self, conn, packet, loid, ltid, lptid):
171
        raise UnexpectedPacketError
172

173
    def askPartitionTable(self, conn, packet, offset_list):
174
        raise UnexpectedPacketError
175

176
    def answerPartitionTable(self, conn, packet, ptid, row_list):
177
        raise UnexpectedPacketError
178

179
    def sendPartitionTable(self, conn, packet, ptid, row_list):
180
        raise UnexpectedPacketError
181

182
    def notifyPartitionChanges(self, conn, packet, ptid, cell_list):
183
        raise UnexpectedPacketError
184

185
    def startOperation(self, conn, packet):
186
        raise UnexpectedPacketError
187

188
    def stopOperation(self, conn, packet):
189
        raise UnexpectedPacketError
190

191
    def askUnfinishedTransactions(self, conn, packet):
192
        raise UnexpectedPacketError
193

194
    def answerUnfinishedTransactions(self, conn, packet, tid_list):
195
        raise UnexpectedPacketError
196

197
    def askObjectPresent(self, conn, packet, oid, tid):
198
        raise UnexpectedPacketError
199

200
    def answerObjectPresent(self, conn, packet, oid, tid):
201
        raise UnexpectedPacketError
202

203
    def deleteTransaction(self, conn, packet, tid):
204
        raise UnexpectedPacketError
205

206
    def commitTransaction(self, conn, packet, tid):
207
        raise UnexpectedPacketError
208

209
    def askBeginTransaction(self, conn, packet, tid):
210
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
211

212
    def answerBeginTransaction(self, conn, packet, tid):
213
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
214

215
    def askNewOIDs(self, conn, packet, num_oids):
216
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
217

218
    def answerNewOIDs(self, conn, packet, num_oids):
219
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
220

221
    def finishTransaction(self, conn, packet, oid_list, tid):
222
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
223

224
    def notifyTransactionFinished(self, conn, packet, tid):
225
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
226

227
    def lockInformation(self, conn, packet, tid):
228
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
229

230
    def notifyInformationLocked(self, conn, packet, tid):
231
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
232

233
    def invalidateObjects(self, conn, packet, oid_list, tid):
234
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
235

236
    def unlockInformation(self, conn, packet, tid):
237
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
238

239
    def askStoreObject(self, conn, packet, oid, serial,
240
                             compression, checksum, data, tid):
241
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
242

243
    def answerStoreObject(self, conn, packet, conflicting, oid, serial):
244
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
245

246
    def abortTransaction(self, conn, packet, tid):
247
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
248

249
    def askStoreTransaction(self, conn, packet, tid, user, desc,
Aurel's avatar
Aurel committed
250
                                  ext, oid_list):
251
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
252

253
    def answerStoreTransaction(self, conn, packet, tid):
254
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
255

256
    def askObject(self, conn, packet, oid, serial, tid):
257
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
258

259
    def answerObject(self, conn, packet, oid, serial_start,
260
                           serial_end, compression, checksum, data):
261
        raise UnexpectedPacketError
262

263
    def askTIDs(self, conn, packet, first, last, partition):
264
        raise UnexpectedPacketError
265

266
    def answerTIDs(self, conn, packet, tid_list):
267
        raise UnexpectedPacketError
268

269
    def askTransactionInformation(self, conn, packet, tid):
270
        raise UnexpectedPacketError
271

272
    def answerTransactionInformation(self, conn, packet, tid, 
273
                                           user, desc, ext, oid_list):
274
        raise UnexpectedPacketError
275

276
    def askObjectHistory(self, conn, packet, oid, first, last):
277
        raise UnexpectedPacketError
278

279
    def answerObjectHistory(self, conn, packet, oid, history_list):
280
        raise UnexpectedPacketError
281

282
    def askOIDs(self, conn, packet, first, last, partition):
283
        raise UnexpectedPacketError
284

285
    def answerOIDs(self, conn, packet, oid_list):
286
        raise UnexpectedPacketError
287

288
    def askPartitionList(self, conn, packet, min_offset, max_offset, uuid):
289
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
290

291
    def answerPartitionList(self, conn, packet, ptid, row_list):
292
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
293

294
    def askNodeList(self, conn, packet, offset_list):
295
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
296

297
    def answerNodeList(self, conn, packet, node_list):
298
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
299

300
    def setNodeState(self, conn, packet, uuid, state, modify_partition_table):
301
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
302

303
    def answerNodeState(self, conn, packet, uuid, state):
304
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
305

306
    def addPendingNodes(self, conn, packet, uuid_list):
307 308
        raise UnexpectedPacketError

309
    def answerNewNodes(self, conn, packet, uuid_list):
310 311
        raise UnexpectedPacketError

312
    def askNodeInformation(self, conn, packet):
313 314
        raise UnexpectedPacketError

315 316
    def answerNodeInformation(self, conn, packet):
        # XXX: Just an acknowledge, to be removed
317 318
        raise UnexpectedPacketError

319
    def askClusterState(self, conn, packet):
320 321
        raise UnexpectedPacketError

322
    def answerClusterState(self, conn, packet, state):
323 324
        raise UnexpectedPacketError

325
    def setClusterState(self, conn, packet, state):
326 327
        raise UnexpectedPacketError

328
    def notifyClusterInformation(self, conn, packet, state):
329
        raise UnexpectedPacketError
330

331
    def notifyLastOID(self, conn, packet, oid):
332 333
        raise UnexpectedPacketError

334

Yoshinori Okuji's avatar
Yoshinori Okuji committed
335 336
    # Error packet handlers.

337
    def error(self, conn, packet, code, message):
338 339 340 341 342 343
        try:
            method = self.error_dispatch_table[code]
            method(conn, packet, message)
        except ValueError:
            raise UnexpectedPacketError(message)

344
    def notReady(self, conn, packet, message):
345 346
        raise UnexpectedPacketError

347
    def oidNotFound(self, conn, packet, message):
348 349
        raise UnexpectedPacketError

350
    def tidNotFound(self, conn, packet, message):
351
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
352

353
    def protocolError(self, conn, packet, message):
354 355
        # the connection should have been closed by the remote peer
        logging.error('protocol error: %s' % (message,))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
356

357
    def timeoutError(self, conn, packet, message):
358
        logging.error('timeout error: %s' % (message,))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
359

360
    def brokenNodeDisallowedError(self, conn, packet, message):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
361 362
        raise RuntimeError, 'broken node disallowed error: %s' % (message,)

363
    def noError(self, conn, packet, message):
364 365
        logging.debug("no error message : %s" % (message))

Aurel's avatar
Aurel committed
366

367 368
    # Fetch tables initialization

369
    def __initPacketDispatchTable(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
370 371
        d = {}

372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
        d[Packets.Error] = self.error
        d[Packets.RequestIdentification] = self.requestIdentification
        d[Packets.AcceptIdentification] = self.acceptIdentification
        d[Packets.AskPrimary] = self.askPrimary
        d[Packets.AnswerPrimary] = self.answerPrimary
        d[Packets.AnnouncePrimary] = self.announcePrimary
        d[Packets.ReelectPrimary] = self.reelectPrimary
        d[Packets.NotifyNodeInformation] = self.notifyNodeInformation
        d[Packets.AskLastIDs] = self.askLastIDs
        d[Packets.AnswerLastIDs] = self.answerLastIDs
        d[Packets.AskPartitionTable] = self.askPartitionTable
        d[Packets.AnswerPartitionTable] = self.answerPartitionTable
        d[Packets.SendPartitionTable] = self.sendPartitionTable
        d[Packets.NotifyPartitionChanges] = self.notifyPartitionChanges
        d[Packets.StartOperation] = self.startOperation
        d[Packets.StopOperation] = self.stopOperation
        d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions
        d[Packets.AnswerUnfinishedTransactions] = self.answerUnfinishedTransactions
        d[Packets.AskObjectPresent] = self.askObjectPresent
        d[Packets.AnswerObjectPresent] = self.answerObjectPresent
        d[Packets.DeleteTransaction] = self.deleteTransaction
        d[Packets.CommitTransaction] = self.commitTransaction
        d[Packets.AskBeginTransaction] = self.askBeginTransaction
        d[Packets.AnswerBeginTransaction] = self.answerBeginTransaction
        d[Packets.FinishTransaction] = self.finishTransaction
        d[Packets.NotifyTransactionFinished] = self.notifyTransactionFinished
        d[Packets.LockInformation] = self.lockInformation
        d[Packets.NotifyInformationLocked] = self.notifyInformationLocked
        d[Packets.InvalidateObjects] = self.invalidateObjects
        d[Packets.UnlockInformation] = self.unlockInformation
        d[Packets.AskNewOIDs] = self.askNewOIDs
        d[Packets.AnswerNewOIDs] = self.answerNewOIDs
        d[Packets.AskStoreObject] = self.askStoreObject
        d[Packets.AnswerStoreObject] = self.answerStoreObject
        d[Packets.AbortTransaction] = self.abortTransaction
        d[Packets.AskStoreTransaction] = self.askStoreTransaction
        d[Packets.AnswerStoreTransaction] = self.answerStoreTransaction
        d[Packets.AskObject] = self.askObject
        d[Packets.AnswerObject] = self.answerObject
        d[Packets.AskTIDs] = self.askTIDs
        d[Packets.AnswerTIDs] = self.answerTIDs
        d[Packets.AskTransactionInformation] = self.askTransactionInformation
        d[Packets.AnswerTransactionInformation] = self.answerTransactionInformation
        d[Packets.AskObjectHistory] = self.askObjectHistory
        d[Packets.AnswerObjectHistory] = self.answerObjectHistory
        d[Packets.AskOIDs] = self.askOIDs
        d[Packets.AnswerOIDs] = self.answerOIDs
        d[Packets.AskPartitionList] = self.askPartitionList
        d[Packets.AnswerPartitionList] = self.answerPartitionList
        d[Packets.AskNodeList] = self.askNodeList
        d[Packets.AnswerNodeList] = self.answerNodeList
        d[Packets.SetNodeState] = self.setNodeState
        d[Packets.AnswerNodeState] = self.answerNodeState
        d[Packets.SetClusterState] = self.setClusterState
        d[Packets.AddPendingNodes] = self.addPendingNodes
        d[Packets.AnswerNewNodes] = self.answerNewNodes
        d[Packets.AskNodeInformation] = self.askNodeInformation
        d[Packets.AnswerNodeInformation] = self.answerNodeInformation
        d[Packets.AskClusterState] = self.askClusterState
        d[Packets.AnswerClusterState] = self.answerClusterState
        d[Packets.NotifyClusterInformation] = self.notifyClusterInformation
        d[Packets.NotifyLastOID] = self.notifyLastOID
434

435
        return d
Yoshinori Okuji's avatar
Yoshinori Okuji committed
436

437
    def __initErrorDispatchTable(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
438
        d = {}
439

440 441 442 443 444 445
        d[ErrorCodes.NO_ERROR] = self.noError
        d[ErrorCodes.NOT_READY] = self.notReady
        d[ErrorCodes.OID_NOT_FOUND] = self.oidNotFound
        d[ErrorCodes.TID_NOT_FOUND] = self.tidNotFound
        d[ErrorCodes.PROTOCOL_ERROR] = self.protocolError
        d[ErrorCodes.BROKEN_NODE] = self.brokenNodeDisallowedError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
446

447 448
        return d