recovery.py 5.39 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2006-2010  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

from struct import pack

20
import neo
21 22
from neo.util import dump
from neo.protocol import Packets, ProtocolError, ClusterStates, NodeStates
23
from neo.protocol import NotReadyError, ZERO_OID, ZERO_TID
24 25 26 27 28 29 30 31 32 33 34 35
from neo.master.handlers import MasterHandler

REQUIRED_NODE_NUMBER = 1

class RecoveryManager(MasterHandler):
    """
      Manage the cluster recovery
    """

    def __init__(self, app):
        super(RecoveryManager, self).__init__(app)
        # The target node's uuid to request next.
36
        self.target_ptid = None
37 38 39 40 41 42 43 44

    def getHandler(self):
        return self

    def identifyStorageNode(self, uuid, node):
        """
            Returns the handler for storage nodes
        """
45
        if uuid is None and not self.app._startup_allowed:
46
            neo.logging.info('reject empty storage node')
47 48 49 50 51 52 53 54 55 56
            raise NotReadyError
        return (uuid, NodeStates.RUNNING, self)

    def run(self):
        """
        Recover the status about the cluster. Obtain the last OID, the last
        TID, and the last Partition Table ID from storage nodes, then get
        back the latest partition table or make a new table from scratch,
        if this is the first time.
        """
57
        neo.logging.info('begin the recovery of the status')
58 59 60 61

        self.app.changeClusterState(ClusterStates.RECOVERING)
        em = self.app.em

62
        self.app.tm.setLastOID(None)
63 64 65 66 67 68
        self.app.pt.setID(None)

        # collect the last partition table available
        while not self.app._startup_allowed:
            em.poll(1)

69
        neo.logging.info('startup allowed')
70 71 72 73 74 75 76 77 78 79 80 81 82

        # build a new partition table
        if self.app.pt.getID() is None:
            self.buildFromScratch()

        # collect node that are connected but not in the selected partition
        # table and set them in pending state
        allowed_node_set = set(self.app.pt.getNodeList())
        refused_node_set = set(self.app.nm.getStorageList()) - allowed_node_set
        for node in refused_node_set:
            node.setPending()
        self.app.broadcastNodesInformation(refused_node_set)

83
        self.app.setLastTransaction(self.app.tm.getLastTID())
84 85
        neo.logging.debug('cluster starts with loid=%s and this partition ' \
            'table :', dump(self.app.tm.getLastOID()))
86 87 88 89
        self.app.pt.log()

    def buildFromScratch(self):
        nm, em, pt = self.app.nm, self.app.em, self.app.pt
90 91
        neo.logging.debug('creating a new partition table, wait for a ' \
            'storage node')
92 93 94 95 96 97 98 99 100
        # wait for some empty storage nodes, their are accepted
        while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
            em.poll(1)
        # take the first node available
        node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
        for node in node_list:
            node.setRunning()
        self.app.broadcastNodesInformation(node_list)
        # resert IDs generators
101
        self.app.tm.setLastOID(ZERO_OID)
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
        # build the partition with this node
        pt.make(node_list)

    def connectionLost(self, conn, new_state):
        node = self.app.nm.getByUUID(conn.getUUID())
        assert node is not None
        if node.getState() == new_state:
            return
        node.setState(new_state)

    def connectionCompleted(self, conn):
        # XXX: handler split review needed to remove this hack
        if not self.app._startup_allowed:
            # ask the last IDs to perform the recovery
            conn.ask(Packets.AskLastIDs())

    def answerLastIDs(self, conn, loid, ltid, lptid):
        # Get max values.
120
        if loid is not None:
121
            self.app.tm.setLastOID(max(loid, self.app.tm.getLastOID()))
122 123
        if ltid is not None:
            self.app.tm.setLastTID(ltid)
124
        if lptid > self.target_ptid:
125
            # something newer
126
            self.target_ptid = lptid
127
            conn.ask(Packets.AskPartitionTable())
128 129

    def answerPartitionTable(self, conn, ptid, row_list):
130
        if ptid != self.target_ptid:
131
            # If this is not from a target node, ignore it.
132
            neo.logging.warn('Got %s while waiting %s', dump(ptid),
133
                    dump(self.target_ptid))
134 135
            return
        try:
136
            new_nodes = self.app.pt.load(ptid, row_list, self.app.nm)
137 138
        except IndexError:
            raise ProtocolError('Invalid offset')
139
        else:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
140
            notification = Packets.NotifyNodeInformation(new_nodes)
141 142 143
            ptid = self.app.pt.getID()
            row_list = self.app.pt.getRowList()
            partition_table = Packets.SendPartitionTable(ptid, row_list)
144 145
            # notify the admin nodes
            for node in self.app.nm.getAdminList(only_identified=True):
146
                node.notify(notification)
147
                node.notify(partition_table)
148