client.py 4.49 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3

4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17

18
import neo.lib
19

20
from neo.lib.protocol import NodeStates, Packets, ProtocolError
21
from neo.master.handlers import MasterHandler
22
from neo.lib.util import dump
23
from neo.master.transactions import DelayedError
24

25
class ClientServiceHandler(MasterHandler):
26 27 28 29 30
    """ Handler dedicated to client during service state """

    def connectionCompleted(self, conn):
        pass

31
    def connectionLost(self, conn, new_state):
32
        # cancel its transactions and forgot the node
33 34 35 36 37 38 39
        app = self.app
        node = app.nm.getByUUID(conn.getUUID())
        assert node is not None
        app.tm.abortFor(node)
        node.setState(NodeStates.DOWN)
        app.broadcastNodesInformation([node])
        app.nm.remove(node)
40

41 42
    def askNodeInformation(self, conn):
        # send informations about master and storages only
43 44 45 46 47
        nm = self.app.nm
        node_list = []
        node_list.extend(n.asTuple() for n in nm.getMasterList())
        node_list.extend(n.asTuple() for n in nm.getStorageList())
        conn.notify(Packets.NotifyNodeInformation(node_list))
48 49
        conn.answer(Packets.AnswerNodeInformation())

50
    def askBeginTransaction(self, conn, tid):
51 52 53
        """
            A client request a TID, nothing is kept about it until the finish.
        """
54 55 56
        app = self.app
        node = app.nm.getByUUID(conn.getUUID())
        conn.answer(Packets.AnswerBeginTransaction(app.tm.begin(node, tid)))
57

58
    def askNewOIDs(self, conn, num_oids):
59 60 61
        app = self.app
        conn.answer(Packets.AnswerNewOIDs(app.tm.getNextOIDList(num_oids)))
        app.broadcastLastOID()
62

63
    def askFinishTransaction(self, conn, ttid, oid_list):
64 65 66
        app = self.app

        # Collect partitions related to this transaction.
67
        getPartition = app.pt.getPartition
68
        partition_set = set()
69
        partition_set.add(getPartition(ttid))
70 71 72 73
        partition_set.update((getPartition(oid) for oid in oid_list))

        # Collect the UUIDs of nodes related to this transaction.
        uuid_set = set()
74
        isStorageReady = app.isStorageReady
75
        for part in partition_set:
76 77 78 79
            uuid_set.update((uuid for uuid in (
                    cell.getUUID() for cell in app.pt.getCellList(part)
                    if cell.getNodeState() != NodeStates.HIDDEN)
                if isStorageReady(uuid)))
80

81 82 83 84 85 86 87
        if not uuid_set:
            raise ProtocolError('No storage node ready for transaction')

        identified_node_list = app.nm.getIdentifiedList(pool_set=uuid_set)
        usable_uuid_set = set((x.getUUID() for x in identified_node_list))
        partitions = app.pt.getPartitions()
        peer_id = conn.getPeerId()
88 89
        tid = app.tm.prepare(ttid, partitions, oid_list, usable_uuid_set,
            peer_id)
90

91
        # check if greater and foreign OID was stored
92 93
        if app.tm.updateLastOID(oid_list):
            app.broadcastLastOID()
94

95 96 97
        # Request locking data.
        # build a new set as we may not send the message to all nodes as some
        # might be not reachable at that time
98 99
        p = Packets.AskLockInformation(ttid, tid, oid_list)
        for node in identified_node_list:
100
            node.ask(p, timeout=60)
101

102 103 104
    def askPack(self, conn, tid):
        app = self.app
        if app.packing is None:
105
            storage_list = app.nm.getStorageList(only_identified=True)
106 107 108 109 110 111 112 113
            app.packing = (conn, conn.getPeerId(),
                set(x.getUUID() for x in storage_list))
            p = Packets.AskPack(tid)
            for storage in storage_list:
                storage.getConnection().ask(p)
        else:
            conn.answer(Packets.AnswerPack(False))

114 115 116 117
    def askLastTransaction(self, conn):
        conn.answer(Packets.AnswerLastTransaction(
            self.app.getLastTransaction()))

118
    def abortTransaction(self, conn, tid):
119
        self.app.tm.remove(conn.getUUID(), tid)
120