app.py 21.4 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
2
# Copyright (C) 2006-2012  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
import neo
18
import os, sys
Grégory Wisniewski's avatar
Grégory Wisniewski committed
19
from time import time
Yoshinori Okuji's avatar
Yoshinori Okuji committed
20

21 22
from neo.lib.connector import getConnectorHandler
from neo.lib.debug import register as registerLiveDebugger
23
from neo.lib.protocol import UUID_NAMESPACES, ZERO_TID, NotReadyError
24 25 26 27 28 29
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.node import NodeManager
from neo.lib.event import EventManager
from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.lib.util import dump
30 31 32 33

class StateChangedException(Exception): pass

from .backup_app import BackupApplication
34 35 36 37 38 39
from .handlers import election, identification, secondary
from .handlers import administration, client, storage, shutdown
from .pt import PartitionTable
from .recovery import RecoveryManager
from .transactions import TransactionManager
from .verification import VerificationManager
Yoshinori Okuji's avatar
Yoshinori Okuji committed
40

41

42
class Application(object):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
43
    """The master node application."""
44
    packing = None
45 46
    # Latest completely commited TID
    last_transaction = ZERO_TID
47 48
    backup_tid = None
    backup_app = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
49

50
    def __init__(self, config):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
51
        # Internal attributes.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52
        self.em = EventManager()
53
        self.nm = NodeManager(config.getDynamicMasterList())
54
        self.tm = TransactionManager(self.onTransactionCommitted)
55

56 57 58
        self.name = config.getCluster()
        self.server = config.getBind()

59
        self.storage_readiness = set()
Olivier Cros's avatar
Olivier Cros committed
60 61
        master_addresses, connector_name = config.getMasters()
        self.connector_handler = getConnectorHandler(connector_name)
Julien Muchembled's avatar
Julien Muchembled committed
62
        for master_address in master_addresses:
Olivier Cros's avatar
Olivier Cros committed
63
            self.nm.createMaster(address=master_address)
64

65
        neo.lib.logging.debug('IP address is %s, port is %d', *(self.server))
66

67
        # Partition table
68
        replicas, partitions = config.getReplicas(), config.getPartitions()
69 70 71 72 73
        if replicas < 0:
            raise RuntimeError, 'replicas must be a positive integer'
        if partitions <= 0:
            raise RuntimeError, 'partitions must be more than zero'
        self.pt = PartitionTable(partitions, replicas)
74 75 76 77
        neo.lib.logging.info('Configuration:')
        neo.lib.logging.info('Partitions: %d', partitions)
        neo.lib.logging.info('Replicas  : %d', replicas)
        neo.lib.logging.info('Name      : %s', self.name)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
78

79
        self.listening_conn = None
80 81
        self.primary = None
        self.primary_master_node = None
82
        self.cluster_state = None
83
        self._startup_allowed = False
84

85
        # Generate an UUID for self
86
        uuid = config.getUUID()
87
        if uuid is None or uuid == '':
88
            uuid = self.getNewUUID(NodeTypes.MASTER)
89
        self.uuid = uuid
90
        neo.lib.logging.info('UUID      : %s', dump(uuid))
91

92 93 94 95
        # election related data
        self.unconnected_master_node_set = set()
        self.negotiating_master_node_set = set()

96
        self._current_manager = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
97

98 99 100 101 102 103 104 105 106
        # backup
        upstream_cluster = config.getUpstreamCluster()
        if upstream_cluster:
            if upstream_cluster == self.name:
                raise ValueError("upstream cluster name must be"
                                 " different from cluster name")
            self.backup_app = BackupApplication(self, upstream_cluster,
                                                *config.getUpstreamMasters())

107
        registerLiveDebugger(on_log=self.log)
108

109 110
    def close(self):
        self.listening_conn = None
111 112
        if self.backup_app is not None:
            self.backup_app.close()
113 114 115 116
        self.nm.close()
        self.em.close()
        del self.__dict__

117 118
    def log(self):
        self.em.log()
119 120
        if self.backup_app is not None:
            self.backup_app.log()
121 122 123 124 125
        self.nm.log()
        self.tm.log()
        if self.pt is not None:
            self.pt.log()

Yoshinori Okuji's avatar
Yoshinori Okuji committed
126
    def run(self):
127 128 129
        try:
            self._run()
        except:
130
            neo.lib.logging.exception('Pre-mortem data:')
131
            self.log()
132 133 134
            raise

    def _run(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
135 136
        """Make sure that the status is sane and start a loop."""
        # Make a listening port.
137
        self.listening_conn = ListeningConnection(self.em, None,
138
            addr=self.server, connector=self.connector_handler())
139

140
        # Start a normal operation.
141
        while True:
142
            # (Re)elect a new primary master.
Julien Muchembled's avatar
Julien Muchembled committed
143
            self.primary = not self.nm.getMasterList()
144
            if not self.primary:
145
                self.electPrimary()
146 147
            try:
                if self.primary:
148
                    self.playPrimaryRole()
149 150
                else:
                    self.playSecondaryRole()
151
                raise RuntimeError, 'should not reach here'
152
            except (ElectionFailure, PrimaryFailure):
153
                # Forget all connections.
154 155
                for conn in self.em.getClientList():
                    conn.close()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
156

157

158
    def electPrimary(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
159
        """Elect a primary master node.
Aurel's avatar
Aurel committed
160

Yoshinori Okuji's avatar
Yoshinori Okuji committed
161 162 163 164
        The difficulty is that a master node must accept connections from
        others while attempting to connect to other master nodes at the
        same time. Note that storage nodes and client nodes may connect
        to self as well as master nodes."""
165
        neo.lib.logging.info('begin the election of a primary master')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
166

167
        client_handler = election.ClientElectionHandler(self)
168 169
        self.unconnected_master_node_set.clear()
        self.negotiating_master_node_set.clear()
170
        self.listening_conn.setHandler(election.ServerElectionHandler(self))
171
        getByAddress = self.nm.getByAddress
172

173
        while True:
174

175 176
            # handle new connected masters
            for node in self.nm.getMasterList():
177 178
                node.setUnknown()
                self.unconnected_master_node_set.add(node.getAddress())
179

180 181 182 183
            # start the election process
            self.primary = None
            self.primary_master_node = None
            try:
184 185
                while (self.unconnected_master_node_set or
                        self.negotiating_master_node_set):
186
                    for addr in self.unconnected_master_node_set:
187 188 189 190
                        ClientConnection(self.em, client_handler,
                            # XXX: Ugly, but the whole election code will be
                            # replaced soon
                            node=getByAddress(addr),
191 192 193
                            connector=self.connector_handler())
                        self.negotiating_master_node_set.add(addr)
                    self.unconnected_master_node_set.clear()
194
                    self.em.poll(1)
195 196
            except ElectionFailure, m:
                # something goes wrong, clean then restart
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
                neo.lib.logging.error('election failed: %s', (m, ))

                # Ask all connected nodes to reelect a single primary master.
                for conn in self.em.getClientList():
                    conn.notify(Packets.ReelectPrimary())
                    conn.abort()

                # Wait until the connections are closed.
                self.primary = None
                self.primary_master_node = None
                t = time() + 10
                while self.em.getClientList() and time() < t:
                    try:
                        self.em.poll(1)
                    except ElectionFailure:
                        pass

                # Close all connections.
                for conn in self.em.getClientList() + self.em.getServerList():
                    conn.close()
217 218
            else:
                # election succeed, stop the process
219 220 221
                self.primary = self.primary is None
                break

222 223 224 225 226 227 228 229
    def broadcastNodesInformation(self, node_list):
        """
          Broadcast changes for a set a nodes
          Send only one packet per connection to reduce bandwidth
        """
        node_dict = {}
        # group modified nodes by destination node type
        for node in node_list:
230
            node_info = node.asTuple()
231 232 233 234 235 236 237 238 239 240 241
            def assign_for_notification(node_type):
                # helper function
                node_dict.setdefault(node_type, []).append(node_info)
            if node.isMaster() or node.isStorage():
                # client get notifications for master and storage only
                assign_for_notification(NodeTypes.CLIENT)
            if node.isMaster() or node.isStorage() or node.isClient():
                assign_for_notification(NodeTypes.STORAGE)
                assign_for_notification(NodeTypes.ADMIN)

        # send at most one non-empty notification packet per node
242
        for node in self.nm.getIdentifiedList():
243
            node_list = node_dict.get(node.getType(), [])
244
            if node_list and node.isRunning():
245
                node.notify(Packets.NotifyNodeInformation(node_list))
246

247
    def broadcastPartitionChanges(self, cell_list, selector=None):
248
        """Broadcast a Notify Partition Changes packet."""
249
        neo.lib.logging.debug('broadcastPartitionChanges')
250
        if not cell_list:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
251
            return
252 253
        if not selector:
            selector = lambda n: n.isClient() or n.isStorage() or n.isAdmin()
254
        self.pt.log()
255 256
        ptid = self.pt.setNextID()
        packet = Packets.NotifyPartitionChanges(ptid, cell_list)
257
        for node in self.nm.getIdentifiedList():
258 259
            if not node.isRunning():
                continue
260
            if selector(node):
261
                node.notify(packet)
262

263 264
    def broadcastLastOID(self):
        oid = self.tm.getLastOID()
265 266
        neo.lib.logging.debug(
                        'Broadcast last OID to storages : %s' % dump(oid))
267
        packet = Packets.NotifyLastOID(oid)
268 269
        for node in self.nm.getStorageList(only_identified=True):
            node.notify(packet)
270

Yoshinori Okuji's avatar
Yoshinori Okuji committed
271
    def provideService(self):
272 273
        """
        This is the normal mode for a primary master node. Handle transactions
Yoshinori Okuji's avatar
Yoshinori Okuji committed
274
        and stop the service only if a catastrophy happens or the user commits
275 276
        a shutdown.
        """
277
        neo.lib.logging.info('provide service')
278
        poll = self.em.poll
279
        self.tm.reset()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
280

281
        self.changeClusterState(ClusterStates.RUNNING)
282

Yoshinori Okuji's avatar
Yoshinori Okuji committed
283
        # Now everything is passive.
284 285 286 287 288 289 290 291 292 293 294 295
        try:
            while True:
                poll(1)
        except OperationFailure:
            # If not operational, send Stop Operation packets to storage
            # nodes and client nodes. Abort connections to client nodes.
            neo.lib.logging.critical('No longer operational')
        except StateChangedException, e:
            assert e.args[0] == ClusterStates.STARTING_BACKUP
            self.backup_tid = tid = self.getLastTransaction()
            self.pt.setBackupTidDict(dict((node.getUUID(), tid)
                for node in self.nm.getStorageList(only_identified=True)))
296

297
    def playPrimaryRole(self):
298 299
        neo.lib.logging.info(
                        'play the primary role with %r', self.listening_conn)
300 301 302 303
        packet = Packets.AnnouncePrimary()
        for conn in self.em.getClientList():
            conn.notify(packet)
            conn.abort()
304 305
        self.listening_conn.setHandler(
                identification.IdentificationHandler(self))
306

307 308 309
        em = self.em
        nm = self.nm

310 311
        # Close all remaining connections to other masters,
        # for the same reason as in playSecondaryRole.
312 313 314
        for conn in em.getConnectionList():
            conn_uuid = conn.getUUID()
            if conn_uuid is not None:
315
                node = nm.getByUUID(conn_uuid)
316
                assert node is not None
317
                assert node.isMaster() and not conn.isClient()
318
                assert node.isUnknown()
319 320
                # this may trigger 'unexpected answer' warnings on remote side
                conn.close()
321

322
        # If I know any storage node, make sure that they are not in the
323
        # running state, because they are not connected at this stage.
324
        for node in nm.getStorageList():
325 326
            if node.isRunning():
                node.setTemporarilyDown()
327

328
        # recover the cluster status at startup
329
        self.runManager(RecoveryManager)
330
        while True:
331
            self.runManager(VerificationManager)
332 333 334 335 336 337 338
            if self.backup_tid:
                if self.backup_app is None:
                    raise RuntimeError("No upstream cluster to backup"
                                       " defined in configuration")
                self.backup_app.provideService()
            else:
                self.provideService()
339 340 341 342 343
            for node in self.nm.getIdentifiedList():
                if node.isStorage() or node.isClient():
                    node.notify(Packets.StopOperation())
                    if node.isClient():
                        node.getConnection().abort()
344 345

    def playSecondaryRole(self):
346 347 348
        """
        I play a secondary role, thus only wait for a primary master to fail.
        """
349
        neo.lib.logging.info('play the secondary role with %r',
350
            self.listening_conn)
351

352 353
        # Wait for an announcement. If this is too long, probably
        # the primary master is down.
354
        t = time() + 10
355 356
        while self.primary_master_node is None:
            self.em.poll(1)
357
            if t < time():
358 359 360
                # election timeout
                raise ElectionFailure("Election timeout")

361 362 363
        # Restart completely. Non-optimized
        # but lower level code needs to be stabilized first.
        for conn in self.em.getConnectionList():
364 365
            if not conn.isListening():
                conn.close()
366

367
        # Reconnect to primary master node.
368
        primary_handler = secondary.PrimaryHandler(self)
369 370
        ClientConnection(self.em, primary_handler,
            node=self.primary_master_node,
371
            connector=self.connector_handler())
372

373
        # and another for the future incoming connections
374
        self.listening_conn.setHandler(
375
            identification.SecondaryIdentificationHandler(self))
376

377
        while True:
378
            self.em.poll(1)
379

380 381 382 383 384
    def runManager(self, manager_klass):
        self._current_manager = manager_klass(self)
        self._current_manager.run()
        self._current_manager = None

385
    def changeClusterState(self, state):
386 387
        """
        Change the cluster state and apply right handler on each connections
388
        """
389 390 391 392
        if self.cluster_state == state:
            return

        # select the storage handler
393
        client_handler = client.ClientServiceHandler(self)
394 395
        if state in (ClusterStates.RUNNING, ClusterStates.STARTING_BACKUP,
                     ClusterStates.BACKINGUP, ClusterStates.STOPPING_BACKUP):
396
            storage_handler = storage.StorageServiceHandler(self)
397 398
        elif self._current_manager is not None:
            storage_handler = self._current_manager.getHandler()
399
        else:
400
            raise RuntimeError('Unexpected cluster state')
401 402

        # change handlers
403
        notification_packet = Packets.NotifyClusterInformation(state)
404
        for node in self.nm.getIdentifiedList():
405
            if node.isMaster():
406
                continue
407
            conn = node.getConnection()
408
            node.notify(notification_packet)
409
            if node.isClient():
410
                if state != ClusterStates.RUNNING:
411
                    conn.abort()
412
                    continue
413
                handler = client_handler
414
            elif node.isStorage():
415
                handler = storage_handler
416 417
            else:
                continue # keep handler
418 419 420
            if type(handler) is not type(conn.getLastHandler()):
                conn.setHandler(handler)
                handler.connectionCompleted(conn)
421 422
        self.cluster_state = state

423
    def getNewUUID(self, node_type):
424 425 426
        try:
            return UUID_NAMESPACES[node_type] + os.urandom(15)
        except KeyError:
427 428 429
            raise RuntimeError, 'No UUID namespace found for this node type'

    def isValidUUID(self, uuid, addr):
430 431
        if uuid == self.uuid:
            return False
432
        node = self.nm.getByUUID(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
433
        return node is None or node.getAddress() in (None, addr)
434

435 436
    def getClusterState(self):
        return self.cluster_state
437

438 439
    def shutdown(self):
        """Close all connections and exit"""
440 441 442
        # XXX: This behaviour is probably broken, as it applies the same
        #   handler to all connection types. It must be carefuly reviewed and
        #   corrected.
443
        # change handler
444
        handler = shutdown.ShutdownHandler(self)
445 446
        for node in self.nm.getIdentifiedList():
            node.getConnection().setHandler(handler)
447 448

        # wait for all transaction to be finished
449
        while self.tm.hasPending():
450
            self.em.poll(1)
451 452

        if self.cluster_state != ClusterStates.RUNNING:
453
            neo.lib.logging.info("asking all nodes to shutdown")
454 455
            # This code sends packets but never polls, so they never reach
            # network.
456
            for node in self.nm.getIdentifiedList():
457
                notification = Packets.NotifyNodeInformation([node.asTuple()])
458
                if node.isClient():
459
                    node.notify(notification)
460
                elif node.isStorage() or node.isMaster():
461
                    node.notify(notification)
462 463 464

        # then shutdown
        sys.exit()
465 466

    def identifyStorageNode(self, uuid, node):
467
        if self.cluster_state == ClusterStates.STOPPING:
468
            raise NotReadyError
469 470 471 472 473
        state = NodeStates.RUNNING
        if uuid is None or node is None:
            # same as for verification
            state = NodeStates.PENDING
        return uuid, state, storage.StorageServiceHandler(self)
474 475 476

    def identifyNode(self, node_type, uuid, node):

477
        state = NodeStates.RUNNING
478

479
        if node_type == NodeTypes.ADMIN:
480
            # always accept admin nodes
481
            node_ctor = self.nm.createAdmin
482
            handler = administration.AdministrationHandler(self)
483
            neo.lib.logging.info('Accept an admin %s' % (dump(uuid), ))
484
        elif node_type == NodeTypes.MASTER:
485
            # always put other master in waiting state
486
            node_ctor = self.nm.createMaster
487
            handler = secondary.SecondaryMasterHandler(self)
488
            neo.lib.logging.info('Accept a master %s' % (dump(uuid), ))
489
        elif node_type == NodeTypes.CLIENT:
490
            # refuse any client before running
491
            if self.cluster_state != ClusterStates.RUNNING:
492
                neo.lib.logging.info('Reject a connection from a client')
493
                raise NotReadyError
494
            node_ctor = self.nm.createClient
495
            handler = client.ClientServiceHandler(self)
496
            neo.lib.logging.info('Accept a client %s' % (dump(uuid), ))
497
        elif node_type == NodeTypes.STORAGE:
498
            node_ctor = self.nm.createStorage
Julien Muchembled's avatar
Julien Muchembled committed
499 500 501 502
            manager = self._current_manager
            if manager is None:
                manager = self
            (uuid, state, handler) = manager.identifyStorageNode(uuid, node)
503 504
            neo.lib.logging.info('Accept a storage %s (%s)' %
                            (dump(uuid), state))
Julien Muchembled's avatar
Julien Muchembled committed
505 506
        else:
            handler = identification.IdentificationHandler(self)
507
        return (uuid, node, state, handler, node_ctor)
508

509
    def onTransactionCommitted(self, txn):
510 511 512 513
        # I have received all the lock answers now:
        # - send a Notify Transaction Finished to the initiated client node
        # - Invalidate Objects to the other client nodes
        ttid = txn.getTTID()
514
        tid = txn.getTID()
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
        transaction_node = txn.getNode()
        invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
        transaction_finished = Packets.AnswerTransactionFinished(ttid, tid)
        for client_node in self.nm.getClientList(only_identified=True):
            c = client_node.getConnection()
            if client_node is transaction_node:
                c.answer(transaction_finished, msg_id=txn.getMessageId())
            else:
                c.notify(invalidate_objects)

        # Unlock Information to relevant storage nodes.
        notify_unlock = Packets.NotifyUnlockInformation(ttid)
        getByUUID = self.nm.getByUUID
        for storage_uuid in txn.getUUIDList():
            getByUUID(storage_uuid).getConnection().notify(notify_unlock)

531 532 533 534 535 536 537
        # Notify storage that have replications blocked by this transaction
        notify_finished = Packets.NotifyTransactionFinished(ttid, tid)
        for storage_uuid in txn.getNotificationUUIDList():
            node = getByUUID(storage_uuid)
            if node is not None and node.isConnected():
                node.getConnection().notify(notify_finished)

538
        # remove transaction from manager
539
        self.tm.remove(transaction_node.getUUID(), ttid)
540 541
        self.setLastTransaction(tid)

542 543 544 545 546 547 548 549
    def getLastTransaction(self):
        return self.last_transaction

    def setLastTransaction(self, tid):
        ltid = self.last_transaction
        assert tid >= ltid, (tid, ltid)
        self.last_transaction = tid

550 551 552 553 554 555 556 557 558
    def setStorageNotReady(self, uuid):
        self.storage_readiness.discard(uuid)

    def setStorageReady(self, uuid):
        self.storage_readiness.add(uuid)

    def isStorageReady(self, uuid):
        return uuid in self.storage_readiness