app.py 32.1 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
19
import os, sys
Grégory Wisniewski's avatar
Grégory Wisniewski committed
20
from time import time
21
from struct import pack, unpack
Yoshinori Okuji's avatar
Yoshinori Okuji committed
22

23
from neo import protocol
24 25
from neo.protocol import UUID_NAMESPACES
from neo.protocol import ClusterStates, NodeStates, NodeTypes, Packets
26
from neo.node import NodeManager
Yoshinori Okuji's avatar
Yoshinori Okuji committed
27
from neo.event import EventManager
Grégory Wisniewski's avatar
Grégory Wisniewski committed
28
from neo.connection import ListeningConnection, ClientConnection
Yoshinori Okuji's avatar
Yoshinori Okuji committed
29 30
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \
        OperationFailure
31 32 33
from neo.master.handlers import election, identification, secondary, recovery
from neo.master.handlers import verification, storage, client, shutdown
from neo.master.handlers import administration
34
from neo.master.pt import PartitionTable
35
from neo.util import dump, parseMasterList
36
from neo.connector import getConnectorHandler
Yoshinori Okuji's avatar
Yoshinori Okuji committed
37

38 39
REQUIRED_NODE_NUMBER = 1

40
class Application(object):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
41 42
    """The master node application."""

43
    def __init__(self, cluster, bind, masters, replicas, partitions, uuid):
44

45 46
        # always use default connector for now
        self.connector_handler = getConnectorHandler()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
47

48 49
        # set the cluster name
        if cluster is None:
50
            raise RuntimeError, 'cluster name must be non-empty'
51
        self.name = cluster
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52

53 54 55
        # set the bind address
        ip_address, port = bind.split(':')
        self.server = (ip_address, int(port))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
56
        logging.debug('IP address is %s, port is %d', *(self.server))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
57

58
        # load master node list
59
        self.master_node_list = parseMasterList(masters, self.server)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
60 61 62
        logging.debug('master nodes are %s', self.master_node_list)

        # Internal attributes.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63
        self.em = EventManager()
64
        self.nm = NodeManager()
65 66 67 68 69 70 71 72 73

        # Partition table
        if replicas < 0:
            raise RuntimeError, 'replicas must be a positive integer'
        if partitions <= 0:
            raise RuntimeError, 'partitions must be more than zero'
        self.pt = PartitionTable(partitions, replicas)
        logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s',
                      replicas, partitions, self.name)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
74

75
        self.listening_conn = None
76 77
        self.primary = None
        self.primary_master_node = None
78
        self.cluster_state = None
79

80
        # Generate an UUID for self
81
        if uuid is None:
82
            uuid = self.getNewUUID(NodeTypes.MASTER)
83
        self.uuid = uuid
84

Yoshinori Okuji's avatar
Yoshinori Okuji committed
85
        # The last OID.
86
        self.loid = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
87
        # The last TID.
88
        self.ltid = None
89 90
        # The target node's uuid to request next.
        self.target_uuid = None
91

92 93 94 95 96 97 98 99 100 101 102 103 104
        # election related data
        self.unconnected_master_node_set = set()
        self.negotiating_master_node_set = set()

        # verification related data
        self.unfinished_oid_set = set()
        self.unfinished_tid_set = set()
        self.asking_uuid_dict = {}
        self.object_present = False

        # service related data
        self.finishing_transaction_dict = {}

Yoshinori Okuji's avatar
Yoshinori Okuji committed
105 106 107

    def run(self):
        """Make sure that the status is sane and start a loop."""
108 109
        for address in self.master_node_list:
            self.nm.createMaster(address=address)
110

Yoshinori Okuji's avatar
Yoshinori Okuji committed
111
        # Make a listening port.
112 113 114
        self.listening_conn = ListeningConnection(self.em, None, 
            addr = self.server, connector_handler = self.connector_handler)

115
        self.cluster_state = ClusterStates.BOOTING
Yoshinori Okuji's avatar
Yoshinori Okuji committed
116 117
        # Start the election of a primary master node.
        self.electPrimary()
Aurel's avatar
Aurel committed
118

119 120 121 122
        # Start a normal operation.
        while 1:
            try:
                if self.primary:
123
                    self.playPrimaryRole()
124 125
                else:
                    self.playSecondaryRole()
126
                raise RuntimeError, 'should not reach here'
127
            except (ElectionFailure, PrimaryFailure):
128
                # Forget all connections.
129 130
                for conn in self.em.getClientList():
                    conn.close()
131
                # Reelect a new primary master.
132
                self.electPrimary(bootstrap = False)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
133

134
    def electPrimary(self, bootstrap = True):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
135
        """Elect a primary master node.
Aurel's avatar
Aurel committed
136

Yoshinori Okuji's avatar
Yoshinori Okuji committed
137 138 139 140 141 142
        The difficulty is that a master node must accept connections from
        others while attempting to connect to other master nodes at the
        same time. Note that storage nodes and client nodes may connect
        to self as well as master nodes."""
        logging.info('begin the election of a primary master')

143 144
        self.unconnected_master_node_set = set()
        self.negotiating_master_node_set = set()
145 146
        self.listening_conn.setHandler(election.ServerElectionHandler(self))
        client_handler = election.ClientElectionHandler(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
147
        em = self.em
148
        nm = self.nm
149

150
        for node in nm.getMasterList():
151 152
            # For now, believe that every node should be available,
            # since down or broken nodes may be already repaired.
153
            node.setRunning()
154

Yoshinori Okuji's avatar
Yoshinori Okuji committed
155
        while 1:
156 157 158 159
            t = 0
            self.primary = None
            self.primary_master_node = None

160
            for node in nm.getMasterList():
161
                if node.isRunning():
162
                    self.unconnected_master_node_set.add(node.getAddress())
163

164 165 166 167 168
            # Wait at most 20 seconds at bootstrap. Otherwise, wait at most 
            # 10 seconds to avoid stopping the whole cluster for a long time.
            # Note that even if not all master are up in the first 20 seconds
            # this is not an issue because the first up will timeout and take
            # the primary role.
169
            if bootstrap:
170
                expiration = 20
171 172 173
            else:
                expiration = 10

174 175 176 177 178
            try:
                while 1:
                    current_time = time()
                    if current_time >= t + 1:
                        t = current_time
179
                        for node in nm.getMasterList():
180
                            if node.isTemporarilyDown() \
181
                                    and node.getLastStateChange() + expiration < current_time:
182
                                logging.info('%s is down' % (node, ))
183
                                node.setDown()
184
                                self.unconnected_master_node_set.discard(node.getAddress())
185 186 187

                        # Try to connect to master nodes.
                        if self.unconnected_master_node_set:
188
                            for addr in list(self.unconnected_master_node_set):
189
                                ClientConnection(em, client_handler, addr = addr,
190
                                                 connector_handler = self.connector_handler)
191
                    em.poll(1)
192 193
                    if len(self.unconnected_master_node_set) == 0 \
                       and len(self.negotiating_master_node_set) == 0:
194 195 196 197 198 199 200 201 202
                        break

                # Now there are three situations:
                #   - I am the primary master
                #   - I am secondary but don't know who is primary
                #   - I am secondary and know who is primary
                if self.primary is None:
                    # I am the primary.
                    self.primary = True
203
                    logging.debug('I am the primary, so sending an announcement')
204
                    for conn in em.getClientList():
205
                        conn.notify(Packets.AnnouncePrimary())
206
                        conn.abort()
207
                    t = time()
208
                    while em.getClientList():
209
                        em.poll(1)
210
                        if t + 10 < time():
211 212 213
                            for conn in em.getClientList():
                                conn.close()
                            break
214 215 216 217 218
                else:
                    # Wait for an announcement. If this is too long, probably
                    # the primary master is down.
                    t = time()
                    while self.primary_master_node is None:
219
                        em.poll(1)
220 221 222 223 224
                        if t + 10 < time():
                            raise ElectionFailure, 'no primary master elected'

                    # Now I need only a connection to the primary master node.
                    primary = self.primary_master_node
225
                    addr = primary.getAddress()
226 227 228 229
                    for conn in em.getServerList():
                        conn.close()
                    for conn in em.getClientList():
                        if conn.getAddress() != addr:
230
                            conn.close()
231

232
                    # But if there is no such connection, something wrong happened.
233 234
                    for conn in em.getClientList():
                        if conn.getAddress() == addr:
235 236 237
                            break
                    else:
                        raise ElectionFailure, 'no connection remains to the primary'
Yoshinori Okuji's avatar
Yoshinori Okuji committed
238

239
                return
240
            except ElectionFailure, m:
241
                logging.error('election failed; %s' % m)
242

243
                # Ask all connected nodes to reelect a single primary master.
244
                for conn in em.getClientList():
245
                    conn.notify(Packets.ReelectPrimary())
246
                    conn.abort()
247 248 249 250 251

                # Wait until the connections are closed.
                self.primary = None
                self.primary_master_node = None
                t = time()
252
                while em.getClientList():
253
                    try:
254
                        em.poll(1)
255 256 257 258 259
                    except ElectionFailure:
                        pass
                    if time() > t + 10:
                        # If too long, do not wait.
                        break
260

261
                # Close all connections.
262 263 264 265
                for conn in em.getClientList():
                    conn.close()
                for conn in em.getServerList():
                    conn.close()
266 267
                bootstrap = False

268
    # XXX: should accept a node list and send at most one packet per peer
269 270
    def broadcastNodeInformation(self, node):
        """Broadcast a Notify Node Information packet."""
271
        logging.debug('broadcasting node information')
272
        node_type = node.getType()
273 274
        state = node.getState()
        uuid = node.getUUID()
275 276

        # The server address may be None.
277
        address = node.getAddress()
278

279
        if node.isClient():
280
            # Only to master nodes and storage nodes.
281
            for c in self.em.getConnectionList():
282
                if c.getUUID() is not None:
283
                    n = self.nm.getByUUID(c.getUUID())
284
                    if n.isMaster() or n.isStorage() or n.isAdmin():
285
                        node_list = [(node_type, address, uuid, state)]
286
                        c.notify(Packets.NotifyNodeInformation(node_list))
287
        elif node.isMaster() or node.isStorage():
288
            for c in self.em.getConnectionList():
289
                if c.getUUID() is not None:
290
                    node_list = [(node_type, address, uuid, state)]
291
                    c.notify(Packets.NotifyNodeInformation(node_list))
292
        elif not node.isAdmin():
293
            raise RuntimeError('unknown node type')
294

295 296
    def broadcastPartitionChanges(self, ptid, cell_list):
        """Broadcast a Notify Partition Changes packet."""
297
        # XXX: don't send if cell_list is empty, to have an unique check
298
        logging.debug('broadcastPartitionChanges')
299
        self.pt.log()
300
        for c in self.em.getConnectionList():
301
            n = self.nm.getByUUID(c.getUUID())
302 303 304 305 306 307 308 309 310
            if n is None:
                continue
            if n.isClient() or n.isStorage() or n.isAdmin():
                # Split the packet if too big.
                size = len(cell_list)
                start = 0
                while size:
                    amt = min(10000, size)
                    cell_list = cell_list[start:start+amt]
311
                    p = Packets.NotifyPartitionChanges(ptid, cell_list)
312 313 314
                    c.notify(p)
                    size -= amt
                    start += amt
315

316 317 318 319 320 321
    def outdateAndBroadcastPartition(self):
        " Outdate cell of non-working nodes and broadcast changes """
        cell_list = self.pt.outdate()
        if cell_list:
            self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)

322 323 324
    def sendPartitionTable(self, conn):
        """ Send the partition table through the given connection """
        row_list = []
325
        for offset in xrange(self.pt.getPartitions()):
326 327 328
            row_list.append((offset, self.pt.getRow(offset)))
            # Split the packet if too huge.
            if len(row_list) == 1000:
329
                conn.notify(Packets.SendPartitionTable( self.pt.getID(), row_list))
330 331
                del row_list[:]
        if row_list:
332
            conn.notify(Packets.SendPartitionTable(self.pt.getID(), row_list))
333

334 335 336
    def sendNodesInformations(self, conn):
        """ Send informations on all nodes through the given connection """
        node_list = []
337
        for n in self.nm.getList():
338
            if not n.isAdmin():
339
                node_list.append(n.asTuple())
340 341
                # Split the packet if too huge.
                if len(node_list) == 10000:
342
                    conn.notify(Packets.NotifyNodeInformation(node_list))
343
                    del node_list[:]
344
        if node_list:
345
            conn.notify(Packets.NotifyNodeInformation(node_list))
346

347 348
    def broadcastLastOID(self, oid):
        logging.debug('Broadcast last OID to storages : %s' % dump(oid))
349
        packet = Packets.NotifyLastOID(oid)
350
        for conn in self.em.getConnectionList():
351
            node = self.nm.getByUUID(conn.getUUID())
352 353 354
            if node is not None and node.isStorage():
                conn.notify(packet)

355 356 357 358
    def buildFromScratch(self):
        nm, em, pt = self.nm, self.em, self.pt
        logging.debug('creating a new partition table, wait for a storage node')
        # wait for some empty storage nodes, their are accepted
359
        while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
360 361
            em.poll(1)
        # take the first node available
362
        node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
363
        for node in node_list:
364
            node.setRunning()
365
            self.broadcastNodeInformation(node)
366
        # resert IDs generators
367 368
        self.loid = '\0' * 8
        self.ltid = '\0' * 8
369 370
        # build the partition with this node
        pt.setID(pack('!Q', 1))
371
        pt.make(node_list)
372

373
    def recoverStatus(self):
374 375 376
        """Recover the status about the cluster. Obtain the last OID, the last TID,
        and the last Partition Table ID from storage nodes, then get back the latest
        partition table or make a new table from scratch, if this is the first time."""
377 378
        logging.info('begin the recovery of the status')

379
        self.changeClusterState(ClusterStates.RECOVERING)
380
        em = self.em
381
    
382 383 384
        self.loid = None
        self.ltid = None
        self.pt.setID(None)
385
        self.target_uuid = None
386

387
        # collect the last partition table available
388
        while self.cluster_state == ClusterStates.RECOVERING:
389 390 391
            em.poll(1)

        logging.info('startup allowed')
392 393

        # build a new partition table
394
        if self.pt.getID() is None:
395
            self.buildFromScratch()
396

397 398 399
        # collect node that are connected but not in the selected partition
        # table and set them in pending state
        allowed_node_set = set(self.pt.getNodeList())
400
        refused_node_set = set(self.nm.getStorageList()) - allowed_node_set
401
        for node in refused_node_set:
402
            node.setPending()
403 404
            self.broadcastNodeInformation(node)

405 406
        logging.debug('cluster starts with loid=%s and this partition table :',
                dump(self.loid))
407
        self.pt.log()
408

409 410 411 412 413
    def verifyTransaction(self, tid):
        em = self.em
        uuid_set = set()

        # Determine to which nodes I should ask.
414
        partition = self.pt.getPartition(tid)
415
        transaction_uuid_list = [cell.getUUID() for cell \
416
                in self.pt.getCellList(partition, readable=True)]
417 418
        if len(transaction_uuid_list) == 0:
            raise VerificationFailure
419
        uuid_set.update(transaction_uuid_list)
Aurel's avatar
Aurel committed
420

421 422 423 424 425 426 427
        # Gather OIDs.
        self.asking_uuid_dict = {}
        self.unfinished_oid_set = set()
        for conn in em.getConnectionList():
            uuid = conn.getUUID()
            if uuid in transaction_uuid_list:
                self.asking_uuid_dict[uuid] = False
428
                conn.ask(Packets.AskTransactionInformation(tid))
429
        if len(self.asking_uuid_dict) == 0:
430 431 432 433 434 435 436 437 438
            raise VerificationFailure

        while 1:
            em.poll(1)
            if not self.pt.operational():
                raise VerificationFailure
            if False not in self.asking_uuid_dict.values():
                break

439
        if self.unfinished_oid_set is None or len(self.unfinished_oid_set) == 0:
440 441 442 443 444 445
            # Not commitable.
            return None
        else:
            # Verify that all objects are present.
            for oid in self.unfinished_oid_set:
                self.asking_uuid_dict.clear()
446
                partition = self.pt.getPartition(oid)
447
                object_uuid_list = [cell.getUUID() for cell \
448
                            in self.pt.getCellList(partition, readable=True)]
449 450 451 452 453 454 455 456 457
                if len(object_uuid_list) == 0:
                    raise VerificationFailure
                uuid_set.update(object_uuid_list)

                self.object_present = True
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid in object_uuid_list:
                        self.asking_uuid_dict[uuid] = False
458
                        conn.ask(Packets.AskObjectPresent(oid, tid))
459 460 461 462 463 464 465 466 467 468 469 470 471 472

                while 1:
                    em.poll(1)
                    if not self.pt.operational():
                        raise VerificationFailure
                    if False not in self.asking_uuid_dict.values():
                        break

                if not self.object_present:
                    # Not commitable.
                    return None

        return uuid_set

473 474 475
    def verifyData(self):
        """Verify the data in storage nodes and clean them up, if necessary."""

476
        em, nm = self.em, self.nm
477
        self.changeClusterState(ClusterStates.VERIFYING)
478

479
        # wait for any missing node
480
        logging.debug('waiting for the cluster to be operational')
481
        while not self.pt.operational():
482 483
            em.poll(1)

484 485
        logging.info('start to verify data')

486
        # Gather all unfinished transactions.
Aurel's avatar
Aurel committed
487
        #
488 489 490 491 492 493 494 495 496
        # FIXME this part requires more brainstorming. Currently, this deals with
        # only unfinished transactions. But how about finished transactions?
        # Suppose that A and B have an unfinished transaction. First, A and B are
        # asked to commit the transaction. Then, A succeeds. B gets down. Now,
        # A believes that the transaction has been committed, while B still believes
        # that the transaction is unfinished. Next, if B goes back and A is working,
        # no problem; because B's unfinished transaction will be committed correctly.
        # However, when B goes back, if A is down, what happens? If the state is
        # not very good, B may be asked to abort the transaction!
Aurel's avatar
Aurel committed
497
        #
498 499 500 501
        # This situation won't happen frequently, and B shouldn't be asked to drop
        # the transaction, if the cluster is not ready. However, there might be
        # some corner cases where this may happen. That's why more brainstorming
        # is required.
502 503 504 505 506
        self.asking_uuid_dict = {}
        self.unfinished_tid_set = set()
        for conn in em.getConnectionList():
            uuid = conn.getUUID()
            if uuid is not None:
507
                node = nm.getByUUID(uuid)
508
                if node.isStorage():
509
                    self.asking_uuid_dict[uuid] = False
510
                    conn.ask(Packets.AskUnfinishedTransactions())
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528

        while 1:
            em.poll(1)
            if not self.pt.operational():
                raise VerificationFailure
            if False not in self.asking_uuid_dict.values():
                break

        # Gather OIDs for each unfinished TID, and verify whether the transaction
        # can be finished or must be aborted. This could be in parallel in theory,
        # but not so easy. Thus do it one-by-one at the moment.
        for tid in self.unfinished_tid_set:
            uuid_set = self.verifyTransaction(tid)
            if uuid_set is None:
                # Make sure that no node has this transaction.
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid is not None:
529
                        node = nm.getByUUID(uuid)
530
                        if node.isStorage():
531
                            conn.notify(Packets.DeleteTransaction(tid))
532 533 534 535
            else:
                for conn in em.getConnectionList():
                    uuid = conn.getUUID()
                    if uuid in uuid_set:
536
                        conn.ask(Packets.CommitTransaction(tid))
537

538 539
            # If possible, send the packets now.
            em.poll(0)
540

541 542 543
        # At this stage, all non-working nodes are out-of-date.
        cell_list = self.pt.outdate()

544 545
        # Tweak the partition table, if the distribution of storage nodes
        # is not uniform.
546
        cell_list.extend(self.pt.tweak())
547

548 549
        # If anything changed, send the changes.
        if cell_list:
550
            self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)
551

Yoshinori Okuji's avatar
Yoshinori Okuji committed
552 553 554 555 556 557 558 559
    def provideService(self):
        """This is the normal mode for a primary master node. Handle transactions
        and stop the service only if a catastrophy happens or the user commits
        a shutdown."""
        logging.info('provide service')
        em = self.em
        nm = self.nm

560
        self.changeClusterState(ClusterStates.RUNNING)
561

562 563 564
        # This dictionary is used to hold information on transactions being finished.
        self.finishing_transaction_dict = {}

Yoshinori Okuji's avatar
Yoshinori Okuji committed
565
        # Now everything is passive.
566
        while True:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
567 568 569 570 571 572 573
            try:
                em.poll(1)
            except OperationFailure:
                # If not operational, send Stop Operation packets to storage nodes
                # and client nodes. Abort connections to client nodes.
                logging.critical('No longer operational, so stopping the service')
                for conn in em.getConnectionList():
574
                    node = nm.getByUUID(conn.getUUID())
575
                    if node is not None and (node.isStorage() or node.isClient()):
576
                        conn.notify(Packets.StopOperation())
577
                        if node.isClient():
578
                            conn.abort()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
579 580

                # Then, go back, and restart.
Aurel's avatar
Aurel committed
581
                return
582

583
    def playPrimaryRole(self):
584 585
        logging.info('play the primary role with %s (%s:%d)', 
                dump(self.uuid), *(self.server))
586

587
        # all incoming connections identify through this handler
588
        self.listening_conn.setHandler(identification.IdentificationHandler(self))
589

590 591 592 593 594 595 596 597
        handler = secondary.SecondaryMasterHandler(self)
        em = self.em
        nm = self.nm

        # Make sure that every connection has the secondary event handler.
        for conn in em.getConnectionList():
            conn_uuid = conn.getUUID()
            if conn_uuid is not None:
598
                node = nm.getByUUID(conn_uuid)
599
                assert node is not None
600
                assert node.isMaster()
601 602 603
                conn.setHandler(handler)


604 605
        # If I know any storage node, make sure that they are not in the running state,
        # because they are not connected at this stage.
606
        for node in nm.getStorageList():
607 608
            if node.isRunning():
                node.setTemporarilyDown()
609

610 611 612
        # recover the cluster status at startup
        self.recoverStatus()

Yoshinori Okuji's avatar
Yoshinori Okuji committed
613
        while 1:
614 615 616 617
            try:
                self.verifyData()
            except VerificationFailure:
                continue
Yoshinori Okuji's avatar
Yoshinori Okuji committed
618
            self.provideService()
619 620

    def playSecondaryRole(self):
621
        """I play a secondary role, thus only wait for a primary master to fail."""
622 623
        logging.info('play the secondary role with %s (%s:%d)', 
                dump(self.uuid), *(self.server))
624

625

626 627 628
        # apply the new handler to the primary connection
        client_list = self.em.getClientList()
        assert len(client_list) == 1
629
        client_list[0].setHandler(secondary.PrimaryHandler(self))
630

631 632 633
        # and another for the future incoming connections
        handler = identification.IdentificationHandler(self)
        self.listening_conn.setHandler(handler)
634 635

        while 1:
636
            self.em.poll(1)
637

638 639 640 641 642 643 644
    def changeClusterState(self, state):
        """ Change the cluster state and apply right handler on each connections """
        if self.cluster_state == state:
            return
        nm, em = self.nm, self.em

        # select the storage handler
645
        if state == ClusterStates.BOOTING:
646
            storage_handler = recovery.RecoveryHandler
647
        elif state == ClusterStates.RECOVERING:
648
            storage_handler = recovery.RecoveryHandler
649
        elif state == ClusterStates.VERIFYING:
650
            storage_handler = verification.VerificationHandler
651
        elif state == ClusterStates.RUNNING:
652
            storage_handler = storage.StorageServiceHandler
653 654 655 656
        else:
            RuntimeError('Unexpected node type')

        # change handlers
657
        notification_packet = Packets.NotifyClusterInformation(state)
658
        for conn in em.getConnectionList():
659
            node = nm.getByUUID(conn.getUUID())
660
            if conn.isListening() or node is None:
661 662
                # not identified or listening, keep the identification handler
                continue
663
            conn.notify(notification_packet)
664
            if node.isAdmin() or node.isMaster():
665 666
                # those node types keep their own handler
                continue
667
            if node.isClient():
668
                if state != ClusterStates.RUNNING:
669
                    conn.close()
670
                handler = client.ClientServiceHandler
671
            elif node.isStorage():
672 673 674 675 676 677
                handler = storage_handler
            handler = handler(self)
            conn.setHandler(handler)
            handler.connectionCompleted(conn)
        self.cluster_state = state

Aurel's avatar
Aurel committed
678
    def getNewOIDList(self, num_oids):
679 680
        if self.loid is None:
            raise RuntimeError, 'I do not know the last OID'
681
        oid = unpack('!Q', self.loid)[0] + 1
682 683
        oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
        self.loid = oid_list[-1]
684 685
        self.broadcastLastOID(self.loid)
        return oid_list
686

687 688 689
    def getNewUUID(self, node_type):
        # build an UUID
        uuid = os.urandom(15)
690
        while uuid == protocol.INVALID_UUID[1:]:
691 692 693 694 695 696 697 698
            uuid = os.urandom(15)
        # look for the prefix
        prefix = UUID_NAMESPACES.get(node_type, None)
        if prefix is None:
            raise RuntimeError, 'No UUID namespace found for this node type'
        return prefix + uuid

    def isValidUUID(self, uuid, addr):
699
        node = self.nm.getByUUID(uuid)
700 701
        if node is not None and node.getAddress() is not None \
                and node.getAddress() != addr:
702
            return False
703
        return uuid != self.uuid and uuid is not None
704

705 706
    def getClusterState(self):
        return self.cluster_state
707

708 709
    def shutdown(self):
        """Close all connections and exit"""
710
        # change handler
711
        handler = shutdown.ShutdownHandler(self)
712
        for c in self.em.getConnectionList():
713 714 715 716 717 718
            c.setHandler(handler)

        # wait for all transaction to be finished
        while 1:
            self.em.poll(1)
            if len(self.finishing_transaction_dict) == 0:
719
                if self.cluster_state == ClusterStates.RUNNING:
720 721 722 723 724
                    sys.exit("Application has been asked to shut down")
                else:
                    # no more transaction, ask clients to shutdown
                    logging.info("asking all clients to shutdown")
                    for c in self.em.getConnectionList():
725
                        node = self.nm.getByUUID(c.getUUID())
726
                        if node.isClient():
727
                            node_list = [(node.getType(), node.getAddress(), 
728
                                node.getUUID(), NodeStates.DOWN)]
729
                            c.notify(Packets.NotifyNodeInformation(node_list))
730 731 732
                    # then ask storages and master nodes to shutdown
                    logging.info("asking all remaining nodes to shutdown")
                    for c in self.em.getConnectionList():
733
                        node = self.nm.getByUUID(c.getUUID())
734
                        if node.isStorage() or node.isMaster():
735
                            node_list = [(node.getType(), node.getAddress(), 
736
                                node.getUUID(), NodeStates.DOWN)]
737
                            c.notify(Packets.NotifyNodeInformation(node_list))
738 739
                    # then shutdown
                    sys.exit("Cluster has been asked to shut down")
740 741

    def identifyStorageNode(self, uuid, node):
742
        state = NodeStates.RUNNING
743
        handler = None
744
        if self.cluster_state == ClusterStates.RECOVERING:
745
            if uuid is None:
746 747
                logging.info('reject empty storage node')
                raise protocol.NotReadyError
748
            handler = recovery.RecoveryHandler
749
        elif self.cluster_state == ClusterStates.VERIFYING:
750
            if uuid is None or node is None:
751 752
                # if node is unknown, it has been forget when the current
                # partition was validated by the admin
753 754 755
                # Here the uuid is not cleared to allow lookup pending nodes by
                # uuid from the test framework. It's safe since nodes with a 
                # conflicting UUID are rejected in the identification handler.
756
                state = NodeStates.PENDING
757
            handler = verification.VerificationHandler
758
        elif self.cluster_state == ClusterStates.RUNNING:
759
            if uuid is None or node is None:
760
                # same as for verification
761
                state = NodeStates.PENDING
762
            handler = storage.StorageServiceHandler
763
        elif self.cluster_state == ClusterStates.STOPPING:
764 765 766 767 768 769 770
            raise protocol.NotReadyError
        else:
            raise RuntimeError('unhandled cluster state')
        return (uuid, state, handler)

    def identifyNode(self, node_type, uuid, node):

771
        state = NodeStates.RUNNING
772
        handler = identification.IdentificationHandler
773

774
        if node_type == NodeTypes.ADMIN:
775
            # always accept admin nodes
776
            node_ctor = self.nm.createAdmin
777
            handler = administration.AdministrationHandler
778
            logging.info('Accept an admin %s' % dump(uuid))
779
        elif node_type == NodeTypes.MASTER:
780 781 782
            if node is None:
                # unknown master, rejected
                raise protocol.ProtocolError('Reject an unknown master node')
783
            # always put other master in waiting state
784
            node_ctor = self.nm.createMaster
785
            handler = secondary.SecondaryMasterHandler
786
            logging.info('Accept a master %s' % dump(uuid))
787
        elif node_type == NodeTypes.CLIENT:
788
            # refuse any client before running
789
            if self.cluster_state != ClusterStates.RUNNING:
790
                logging.info('Reject a connection from a client')
791
                raise protocol.NotReadyError
792
            node_ctor = self.nm.createClient
793
            handler = client.ClientServiceHandler
794
            logging.info('Accept a client %s' % dump(uuid))
795
        elif node_type == NodeTypes.STORAGE:
796
            node_ctor = self.nm.createStorage
797
            (uuid, state, handler) = self.identifyStorageNode(uuid, node)
798
            logging.info('Accept a storage (%s)' % state)
799
        return (uuid, node, state, handler, node_ctor)
800