master.py 9.31 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import logging

20
from neo.client.handlers import BaseHandler, AnswerBaseHandler
21
from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
22
        RUNNING_STATE, TEMPORARILY_DOWN_STATE
23 24 25 26
from neo.node import MasterNode, StorageNode
from neo.pt import MTPartitionTable as PartitionTable
from neo.util import dump

27
class PrimaryBootstrapHandler(AnswerBaseHandler):
28 29 30 31 32 33 34 35
    """ Bootstrap handler used when looking for the primary master """

    def handleNotReady(self, conn, packet, message):
        app = self.app
        app.trying_master_node = None
        app.setNodeNotReady()

    def handleAcceptNodeIdentification(self, conn, packet, node_type,
36
                   uuid, address, num_partitions, num_replicas, your_uuid):
37 38 39 40
        app = self.app
        node = app.nm.getNodeByServer(conn.getAddress())
        # this must be a master node
        if node_type != MASTER_NODE_TYPE:
41 42 43 44 45
            conn.lock()
            try:
                conn.close()
            finally:
                conn.release()
46
            return
47
        if conn.getAddress() != address:
48 49 50
            # The server address is different! Then why was
            # the connection successful?
            logging.error('%s:%d is waiting for %s:%d',
51
                          conn.getAddress()[0], conn.getAddress()[1], *address)
52
            app.nm.remove(node)
53 54 55 56 57
            conn.lock()
            try:
                conn.close()
            finally:
                conn.release()
58 59 60 61 62
            return

        conn.setUUID(uuid)
        node.setUUID(uuid)

63
        if your_uuid is not None:
64 65 66 67 68 69 70 71 72 73
            # got an uuid from the primary master
            app.uuid = your_uuid

        # Always create partition table 
        app.pt = PartitionTable(num_partitions, num_replicas)

    def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
                                  known_master_list):
        app = self.app
        # Register new master nodes.
74 75
        for address, uuid in known_master_list:
            n = app.nm.getNodeByServer(address)
76
            if n is None:
77
                n = MasterNode(server=address)
78
                app.nm.add(n)
79
            if uuid is not None:
80 81 82 83 84
                # If I don't know the UUID yet, believe what the peer
                # told me at the moment.
                if n.getUUID() is None or n.getUUID() != uuid:
                    n.setUUID(uuid)

85
        if primary_uuid is not None:
86 87 88 89 90 91 92 93 94 95
            primary_node = app.nm.getNodeByUUID(primary_uuid)
            if primary_node is None:
                # I don't know such a node. Probably this information
                # is old. So ignore it.
                logging.warning('Unknown primary master UUID: %s. ' \
                                'Ignoring.' % dump(primary_uuid))
            else:
                app.primary_master_node = primary_node
                if app.trying_master_node is not primary_node:
                    app.trying_master_node = None
96 97 98 99 100
                    conn.lock()
                    try:
                        conn.close()
                    finally:
                        conn.release()
101 102 103 104 105 106 107
        else:
            if app.primary_master_node is not None:
                # The primary master node is not a primary master node
                # any longer.
                app.primary_master_node = None
 
            app.trying_master_node = None
108 109 110 111 112
            conn.lock()
            try:
                conn.close()
            finally:
                conn.release()
113 114 115 116 117 118 119
 
    def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
        pass
 
    def handleAnswerNodeInformation(self, conn, packet, node_list):
        pass

120
class PrimaryNotificationsHandler(BaseHandler):
121 122 123
    """ Handler that process the notifications from the primary master """

    def connectionClosed(self, conn):
124
        app = self.app
125 126 127 128 129 130 131
        logging.critical("connection to primary master node closed")
        conn.lock()
        try:
            conn.close()
        finally:
            conn.release()
        if app.master_conn is conn:
132 133
            app.master_conn = None
            app.primary_master_node = None
134 135
        else:
            logging.warn('app.master_conn is %s, but we are closing %s', app.master_conn, conn)
136
        super(PrimaryNotificationsHandler, self).connectionClosed(conn)
137 138

    def timeoutExpired(self, conn):
139 140 141 142
        app = self.app
        if app.master_conn is not None:
            assert conn is app.master_conn
            logging.critical("connection timeout to primary master node expired")
143 144 145
        BaseHandler.timeoutExpired(self, conn)

    def peerBroken(self, conn):
146 147 148 149
        app = self.app
        if app.master_conn is not None:
            assert conn is app.master_conn
            logging.critical("primary master node is broken")
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
        BaseHandler.peerBroken(self, conn)

    def handleStopOperation(self, conn, packet):
        logging.critical("master node ask to stop operation")

    def handleInvalidateObjects(self, conn, packet, oid_list, tid):
        app = self.app
        app._cache_lock_acquire()
        try:
            # ZODB required a dict with oid as key, so create it
            oids = {}
            for oid in oid_list:
                oids[oid] = tid
                try:
                    del app.mq_cache[oid]
                except KeyError:
                    pass
            db = app.getDB()
            if db is not None:
                db.invalidate(tid, oids)
        finally:
            app._cache_lock_release()


    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
        app = self.app
        if app.ptid >= ptid:
            # Ignore this packet.
            return
        app.ptid = ptid
180
        app.pt.update(cell_list, app.nm)
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213

    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
        # This handler is in PrimaryBootstrapHandler, since this
        # basicaly is an answer to askPrimaryMaster.
        # Extract from P-NEO-Protocol.Description:
        #  Connection to primary master node (PMN in service state)
        #   CN -> PMN : askPrimaryMaster
        #   PMN -> CN : answerPrimaryMaster containing primary uuid and no
        #               known master list
        #   PMN -> CN : notifyNodeInformation containing list of all
        #   ASK_STORE_TRANSACTION#   PMN -> CN : sendPartitionTable containing partition table id and
        #               list of rows
        # notifyNodeInformation is valid as asynchrounous event, but
        # sendPartitionTable is only triggered after askPrimaryMaster.
        uuid = conn.getUUID()
        app = self.app
        nm = app.nm
        pt = app.pt
        if app.ptid != ptid:
            app.ptid = ptid
            pt.clear()
        for offset, row in row_list:
            for uuid, state in row:
                node = nm.getNodeByUUID(uuid)
                if node is None:
                    node = StorageNode(uuid = uuid)
                    node.setState(TEMPORARILY_DOWN_STATE)
                    nm.add(node)
                pt.setCell(offset, node, state)

    def handleNotifyNodeInformation(self, conn, packet, node_list):
        app = self.app
        nm = app.nm
214
        self.app.nm.update(node_list)
215
        for node_type, addr, uuid, state in node_list:
216
            if node_type != STORAGE_NODE_TYPE or state != RUNNING_STATE:
217
                continue
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
            # close connection to this storage if no longer running
            closed = False
            conn = self.app.em.getConnectionByUUID(uuid)
            if conn is not None:
                conn.lock()
                try:
                    conn.close()
                finally:
                    conn.release()
                    closed = True
            if closed and node_type == STORAGE_NODE_TYPE:
                # Remove from pool connection
                app.cp.removeConnection(n)
                # Put fake packets to task queues.
                # XXX: this should be done in MTClientConnection 
                for key in self.dispatcher.message_table.keys():
                    if id(conn) == key[0]:
                        queue = self.dispatcher.message_table.pop(key)
236 237
                        queue.put((conn, None))

238
class PrimaryAnswersHandler(AnswerBaseHandler):
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
    """ Handle that process expected packets from the primary master """

    def handleAnswerNewTID(self, conn, packet, tid):
        app = self.app
        app.setTID(tid)

    def handleAnswerNewOIDs(self, conn, packet, oid_list):
        app = self.app
        app.new_oid_list = oid_list
        app.new_oid_list.reverse()

    def handleNotifyTransactionFinished(self, conn, packet, tid):
        app = self.app
        if tid == app.getTID():
            app.setTransactionFinished()