Commit 33b2bcd9 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Move each handler in its own file.

Move common handlers in handler.py module.
Add InitializationHandler, applied on the master connection while retreiving
the partition table and node list.
Replace a call to (inexistant) method handleUnexpectedPacket with a raise of
UnexpectedPacketError exception.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@788 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 75c330b2
...@@ -134,7 +134,7 @@ class Application(object): ...@@ -134,7 +134,7 @@ class Application(object):
self.nm.add(MasterNode(server = server)) self.nm.add(MasterNode(server = server))
# Make a listening port # Make a listening port
handler = handlers.IdentificationEventHandler(self) handler = handlers.IdentificationHandler(self)
self.listening_conn = ListeningConnection(self.em, handler, self.listening_conn = ListeningConnection(self.em, handler,
addr=self.server, connector_handler=self.connector_handler) addr=self.server, connector_handler=self.connector_handler)
...@@ -156,6 +156,7 @@ class Application(object): ...@@ -156,6 +156,7 @@ class Application(object):
if node is not None and node.getState() == HIDDEN_STATE: if node is not None and node.getState() == HIDDEN_STATE:
self.wait() self.wait()
self.verifyData() self.verifyData()
self.initialize()
self.doOperation() self.doOperation()
except OperationFailure: except OperationFailure:
logging.error('operation stopped') logging.error('operation stopped')
...@@ -163,8 +164,8 @@ class Application(object): ...@@ -163,8 +164,8 @@ class Application(object):
# this must be handle in order not to fail # this must be handle in order not to fail
self.operational = False self.operational = False
except PrimaryFailure: except PrimaryFailure, msg:
logging.error('primary master is down') logging.error('primary master is down : %s' % msg)
def connectToPrimaryMaster(self): def connectToPrimaryMaster(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
...@@ -184,7 +185,7 @@ class Application(object): ...@@ -184,7 +185,7 @@ class Application(object):
self.ptid = self.dm.getPTID() self.ptid = self.dm.getPTID()
# bootstrap handler, only for outgoing connections # bootstrap handler, only for outgoing connections
handler = handlers.BootstrapEventHandler(self) handler = handlers.BootstrapHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
...@@ -228,7 +229,7 @@ class Application(object): ...@@ -228,7 +229,7 @@ class Application(object):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node is self.primary_master_node: if node is self.primary_master_node:
# Yes, I have. # Yes, I have.
conn.setHandler(handlers.VerificationEventHandler(self)) conn.setHandler(handlers.VerificationHandler(self))
self.master_conn = conn self.master_conn = conn
return return
...@@ -237,20 +238,28 @@ class Application(object): ...@@ -237,20 +238,28 @@ class Application(object):
Connections from client nodes may not be accepted at this stage.""" Connections from client nodes may not be accepted at this stage."""
logging.info('verifying data') logging.info('verifying data')
handler = handlers.VerificationEventHandler(self) handler = handlers.VerificationHandler(self)
self.master_conn.setHandler(handler) self.master_conn.setHandler(handler)
em = self.em em = self.em
while not self.operational: while not self.operational:
em.poll(1) em.poll(1)
def initialize(self):
""" Retreive partition table and node informations from the primary """
logging.info('initializing...')
handler = handlers.InitializationHandler(self)
self.master_conn.setHandler(handler)
# ask node list and partition table # ask node list and partition table
self.pt.clear() self.pt.clear()
self.master_conn.ask(protocol.askNodeInformation()) self.master_conn.ask(protocol.askNodeInformation())
self.master_conn.ask(protocol.askPartitionTable(())) self.master_conn.ask(protocol.askPartitionTable(()))
while not self.has_node_information or not self.has_partition_table: while not self.has_node_information or not self.has_partition_table:
em.poll(1) self.em.poll(1)
self.ready = True self.ready = True
# TODO: notify the master that I switch to operation state, so other
# nodes can now connect to me
def doOperation(self): def doOperation(self):
"""Handle everything, including replications and transactions.""" """Handle everything, including replications and transactions."""
...@@ -259,7 +268,7 @@ class Application(object): ...@@ -259,7 +268,7 @@ class Application(object):
em = self.em em = self.em
nm = self.nm nm = self.nm
handler = handlers.MasterOperationEventHandler(self) handler = handlers.MasterOperationHandler(self)
self.master_conn.setHandler(handler) self.master_conn.setHandler(handler)
# Forget all unfinished data. # Forget all unfinished data.
...@@ -292,7 +301,7 @@ class Application(object): ...@@ -292,7 +301,7 @@ class Application(object):
def wait(self): def wait(self):
# change handler # change handler
logging.info("waiting in hidden state") logging.info("waiting in hidden state")
handler = handlers.HiddenEventHandler(self) handler = handlers.HiddenHandler(self)
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
conn.setHandler(handler) conn.setHandler(handler)
......
from neo.storage.handlers.hidden import HiddenEventHandler from neo.storage.handlers.identification import IdentificationHandler
from neo.storage.handlers.identification import IdentificationEventHandler from neo.storage.handlers.initialization import InitializationHandler
from neo.storage.handlers.bootstrap import BootstrapEventHandler from neo.storage.handlers.verification import VerificationHandler
from neo.storage.handlers.verification import VerificationEventHandler from neo.storage.handlers.replication import ReplicationHandler
from neo.storage.handlers.operation import MasterOperationEventHandler, \ from neo.storage.handlers.bootstrap import BootstrapHandler
ClientOperationEventHandler, StorageOperationEventHandler from neo.storage.handlers.storage import StorageOperationHandler
from neo.storage.handlers.master import MasterOperationHandler
from neo.storage.handlers.client import ClientOperationHandler
from neo.storage.handlers.hidden import HiddenHandler
...@@ -17,20 +17,20 @@ ...@@ -17,20 +17,20 @@
import logging import logging
from neo.storage.handlers.handler import StorageEventHandler from neo.storage.handlers.handler import BaseStorageHandler
from neo.protocol import INVALID_UUID, MASTER_NODE_TYPE, STORAGE_NODE_TYPE from neo.protocol import INVALID_UUID, MASTER_NODE_TYPE, STORAGE_NODE_TYPE
from neo.node import MasterNode from neo.node import MasterNode
from neo import protocol from neo import protocol
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.util import dump from neo.util import dump
class BootstrapEventHandler(StorageEventHandler): class BootstrapHandler(BaseStorageHandler):
"""This class deals with events for a bootstrap phase.""" """This class deals with events for a bootstrap phase."""
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app app = self.app
conn.ask(protocol.askPrimaryMaster()) conn.ask(protocol.askPrimaryMaster())
StorageEventHandler.connectionCompleted(self, conn) BaseStorageHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
app = self.app app = self.app
...@@ -39,8 +39,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -39,8 +39,7 @@ class BootstrapEventHandler(StorageEventHandler):
# So this would effectively mean that it is dead. # So this would effectively mean that it is dead.
app.primary_master_node = None app.primary_master_node = None
app.trying_master_node = None app.trying_master_node = None
BaseStorageHandler.connectionFailed(self, conn)
StorageEventHandler.connectionFailed(self, conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
app = self.app app = self.app
...@@ -48,7 +47,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -48,7 +47,7 @@ class BootstrapEventHandler(StorageEventHandler):
# If a primary master node timeouts, I should not rely on it. # If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None app.primary_master_node = None
app.trying_master_node = None app.trying_master_node = None
StorageEventHandler.timeoutExpired(self, conn) BaseStorageHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
app = self.app app = self.app
...@@ -56,7 +55,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -56,7 +55,7 @@ class BootstrapEventHandler(StorageEventHandler):
# If a primary master node closes, I should not rely on it. # If a primary master node closes, I should not rely on it.
app.primary_master_node = None app.primary_master_node = None
app.trying_master_node = None app.trying_master_node = None
StorageEventHandler.connectionClosed(self, conn) BaseStorageHandler.connectionClosed(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
app = self.app app = self.app
...@@ -65,7 +64,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -65,7 +64,7 @@ class BootstrapEventHandler(StorageEventHandler):
# on it. # on it.
app.primary_master_node = None app.primary_master_node = None
app.trying_master_node = None app.trying_master_node = None
StorageEventHandler.peerBroken(self, conn) BaseStorageHandler.peerBroken(self, conn)
def handleNotReady(self, conn, packet, message): def handleNotReady(self, conn, packet, message):
app = self.app app = self.app
......
...@@ -18,12 +18,11 @@ ...@@ -18,12 +18,11 @@
import logging import logging
from neo import protocol from neo import protocol
from neo.storage.handlers.handler import StorageEventHandler from neo.storage.handlers.handler import BaseClientAndStorageOperationHandler
from neo.protocol import INVALID_SERIAL, INVALID_TID, INVALID_PARTITION, \ from neo.protocol import INVALID_SERIAL, INVALID_TID, INVALID_PARTITION, \
TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE
from neo.util import dump from neo.util import dump
from neo.node import StorageNode
from neo.exception import PrimaryFailure, OperationFailure
class TransactionInformation(object): class TransactionInformation(object):
"""This class represents information on a transaction.""" """This class represents information on a transaction."""
...@@ -48,46 +47,7 @@ class TransactionInformation(object): ...@@ -48,46 +47,7 @@ class TransactionInformation(object):
return self._transaction return self._transaction
class ClientAndStorageOperationEventHandler(StorageEventHandler): class ClientOperationHandler(BaseClientAndStorageOperationHandler):
""" Accept requests common to client and storage nodes """
def handleAskTIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app
if partition == INVALID_PARTITION:
# Collect all usable partitions for me.
getCellList = app.pt.getCellList
partition_list = []
for offset in xrange(app.num_partitions):
for cell in getCellList(offset, readable=True):
if cell.getUUID() == app.uuid:
partition_list.append(offset)
break
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first,
app.num_partitions, partition_list)
conn.answer(protocol.answerTIDs(tid_list), packet)
def handleAskObjectHistory(self, conn, packet, oid, first, last):
if first >= last:
raise protocol.ProtocolError( 'invalid offsets')
app = self.app
history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
history_list = []
p = protocol.answerObjectHistory(oid, history_list)
conn.answer(p, packet)
class ClientOperationEventHandler(ClientAndStorageOperationEventHandler):
def dealWithClientFailure(self, uuid): def dealWithClientFailure(self, uuid):
app = self.app app = self.app
...@@ -106,18 +66,18 @@ class ClientOperationEventHandler(ClientAndStorageOperationEventHandler): ...@@ -106,18 +66,18 @@ class ClientOperationEventHandler(ClientAndStorageOperationEventHandler):
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
self.dealWithClientFailure(conn.getUUID()) self.dealWithClientFailure(conn.getUUID())
StorageEventHandler.timeoutExpired(self, conn) BaseClientAndStorageOperationHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
self.dealWithClientFailure(conn.getUUID()) self.dealWithClientFailure(conn.getUUID())
StorageEventHandler.connectionClosed(self, conn) BaseClientAndStorageOperationHandler.connectionClosed(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
self.dealWithClientFailure(conn.getUUID()) self.dealWithClientFailure(conn.getUUID())
StorageEventHandler.peerBroken(self, conn) BaseClientAndStorageOperationHandler.peerBroken(self, conn)
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
ClientAndStorageOperationEventHandler.connectionCompleted(self, conn) BaseClientAndStorageOperationHandler.connectionCompleted(self, conn)
def handleAskObject(self, conn, packet, oid, serial, tid): def handleAskObject(self, conn, packet, oid, serial, tid):
app = self.app app = self.app
...@@ -210,132 +170,3 @@ class ClientOperationEventHandler(ClientAndStorageOperationEventHandler): ...@@ -210,132 +170,3 @@ class ClientOperationEventHandler(ClientAndStorageOperationEventHandler):
conn.answer(p, packet) conn.answer(p, packet)
app.store_lock_dict[oid] = tid app.store_lock_dict[oid] = tid
class StorageOperationEventHandler(ClientAndStorageOperationEventHandler):
def connectionCompleted(self, conn):
ClientAndStorageOperationEventHandler.connectionCompleted(self, conn)
def handleAskOIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return OIDs only
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app
if partition == INVALID_PARTITION:
# Collect all usable partitions for me.
getCellList = app.pt.getCellList
partition_list = []
for offset in xrange(app.num_partitions):
for cell in getCellList(offset, readable=True):
if cell.getUUID() == app.uuid:
partition_list.append(offset)
break
else:
partition_list = [partition]
oid_list = app.dm.getOIDList(first, last - first,
app.num_partitions, partition_list)
conn.answer(protocol.answerOIDs(oid_list), packet)
class MasterOperationEventHandler(StorageEventHandler):
""" This handler is used for the primary master """
def timeoutExpired(self, conn):
logging.critical('the primary master node times out')
raise PrimaryFailure('the primary master node times out')
def connectionClosed(self, conn):
logging.critical('primary master connection closed')
raise PrimaryFailure('primary master connection closed')
def peerBroken(self, conn):
logging.critical('the primary master is broken')
raise PrimaryFailure('the primary master is broken')
def handleStopOperation(self, conn, packet):
raise OperationFailure('operation stopped')
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
self.app.replicator.setCriticalTID(packet, ltid)
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
self.app.replicator.setUnfinishedTIDList(tid_list)
def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app
t = app.dm.getTransaction(tid)
if t is None:
p = protocol.tidNotFound('%s does not exist' % dump(tid))
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
logging.info('ignoring older partition changes')
return
# First, change the table on memory.
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
if uuid == app.uuid:
# If this is for myself, this can affect replications.
if state == DISCARDED_STATE:
app.replicator.removePartition(offset)
elif state == OUT_OF_DATE_STATE:
app.replicator.addPartition(offset)
# Then, the database.
app.dm.changePartitionTable(ptid, cell_list)
logging.info('Partition table updated:')
self.app.pt.log()
def handleLockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
app.load_lock_dict[o[0]] = tid
app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError:
pass
conn.answer(protocol.notifyInformationLocked(tid), packet)
def handleUnlockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
del app.load_lock_dict[oid]
del app.store_lock_dict[oid]
app.dm.finishTransaction(tid)
del app.transaction_dict[tid]
# Now it may be possible to execute some events.
app.executeQueuedEvents()
except KeyError:
pass
...@@ -18,17 +18,17 @@ ...@@ -18,17 +18,17 @@
import logging import logging
from neo.handler import EventHandler from neo.handler import EventHandler
from neo import protocol
from neo.protocol import Packet, UnexpectedPacketError, \ from neo.protocol import Packet, UnexpectedPacketError, \
INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE
from neo.util import dump from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import PrimaryFailure, OperationFailure
from neo import decorators from neo import decorators
class StorageEventHandler(EventHandler): class BaseStorageHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
...@@ -67,40 +67,6 @@ class StorageEventHandler(EventHandler): ...@@ -67,40 +67,6 @@ class StorageEventHandler(EventHandler):
p = protocol.answerPrimaryMaster(primary_uuid, known_master_list) p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.answer(p, packet) conn.answer(p, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
raise NotImplementedError('this method must be overridden')
@decorators.identification_required
@decorators.restrict_node_types(MASTER_NODE_TYPE)
def handleAnnouncePrimaryMaster(self, conn, packet):
"""Theoretically speaking, I should not get this message,
because the primary master election must happen when I am
not connected to any master node."""
uuid = conn.getUUID()
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is None:
raise RuntimeError('I do not know the uuid %r' % dump(uuid))
if app.primary_master_node is None:
# Hmm... I am somehow connected to the primary master already.
app.primary_master_node = node
if not isinstance(conn, ClientConnection):
# I do not want a connection from any master node. I rather
# want to connect from myself.
conn.close()
elif app.primary_master_node.getUUID() == uuid:
# Yes, I know you are the primary master node.
pass
else:
# It seems that someone else claims taking over the primary
# master node...
app.primary_master_node = None
raise PrimaryFailure('another master node wants to take over')
def handleReelectPrimaryMaster(self, conn, packet):
raise PrimaryFailure('re-election occurs')
@decorators.identification_required @decorators.identification_required
@decorators.restrict_node_types(MASTER_NODE_TYPE) @decorators.restrict_node_types(MASTER_NODE_TYPE)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
...@@ -233,3 +199,58 @@ class StorageEventHandler(EventHandler): ...@@ -233,3 +199,58 @@ class StorageEventHandler(EventHandler):
def handleAskOIDs(self, conn, packet, first, last, partition): def handleAskOIDs(self, conn, packet, first, last, partition):
logging.info('ignoring ask oids') logging.info('ignoring ask oids')
pass pass
class BaseMasterHandler(BaseStorageHandler):
def timeoutExpired(self, conn):
raise PrimaryFailure('times out')
def connectionClosed(self, conn):
raise PrimaryFailure('dead')
def peerBroken(self, conn):
raise PrimaryFailure('broken')
def handleReelectPrimaryMaster(self, conn, packet):
raise PrimaryFailure('re-election occurs')
class BaseClientAndStorageOperationHandler(BaseStorageHandler):
""" Accept requests common to client and storage nodes """
def handleAskTIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return TIDs only
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app
if partition == protocol.INVALID_PARTITION:
# Collect all usable partitions for me.
getCellList = app.pt.getCellList
partition_list = []
for offset in xrange(app.num_partitions):
for cell in getCellList(offset, readable=True):
if cell.getUUID() == app.uuid:
partition_list.append(offset)
break
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first,
app.num_partitions, partition_list)
conn.answer(protocol.answerTIDs(tid_list), packet)
def handleAskObjectHistory(self, conn, packet, oid, first, last):
if first >= last:
raise protocol.ProtocolError( 'invalid offsets')
app = self.app
history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
history_list = []
p = protocol.answerObjectHistory(oid, history_list)
conn.answer(p, packet)
...@@ -17,32 +17,32 @@ ...@@ -17,32 +17,32 @@
import logging import logging
from neo.storage.handlers.handler import StorageEventHandler from neo.storage.handlers.handler import BaseStorageHandler
from neo.protocol import Packet, \ from neo.protocol import Packet, \
INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE, \ DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE, \
DISCARDED_STATE, OUT_OF_DATE_STATE DISCARDED_STATE, OUT_OF_DATE_STATE, UnexpectedPacketError
from neo.node import StorageNode from neo.node import StorageNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo import decorators from neo import decorators
# FIXME: before move handlers, this one was inheriting from EventHandler # FIXME: before move handlers, this one was inheriting from EventHandler
# instead of StorageEventHandler # instead of BaseStorageHandler
class HiddenEventHandler(StorageEventHandler): class HiddenHandler(BaseStorageHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
EventHandler.__init__(self) BaseStorageHandler.__init__(self)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
"""Store information on nodes, only if this is sent by a primary """Store information on nodes, only if this is sent by a primary
master node.""" master node."""
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None: if uuid is None:
self.handleUnexpectedPacket(conn, packet) raise UnexpectedPacketError
return
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
......
...@@ -17,12 +17,12 @@ ...@@ -17,12 +17,12 @@
import logging import logging
from neo.storage.handlers.handler import StorageEventHandler from neo.storage.handlers.handler import BaseStorageHandler
from neo.protocol import BROKEN_STATE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE from neo.protocol import BROKEN_STATE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
from neo import protocol from neo import protocol
from neo.util import dump from neo.util import dump
class IdentificationEventHandler(StorageEventHandler): class IdentificationHandler(BaseStorageHandler):
""" Handler used for incoming connections during operation state """ """ Handler used for incoming connections during operation state """
def connectionClosed(self, conn): def connectionClosed(self, conn):
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo.storage.handlers.handler import BaseMasterHandler
from neo.protocol import TEMPORARILY_DOWN_STATE
from neo import protocol
from neo.node import StorageNode
class InitializationHandler(BaseMasterHandler):
def handleAnswerNodeInformation(self, conn, packet, node_list):
assert not node_list
self.app.has_node_information = True
def handleNotifyNodeInformation(self, *args, **kw):
# FIXME: This message could be replaced by a SendNodeInformation to be
# consistent with SendPartitionTable.
BaseMasterHandler.handleNotifyNodeInformation(self, *args, **kw)
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
"""A primary master node sends this packet to synchronize a partition
table. Note that the message can be split into multiple packets."""
app = self.app
nm = app.nm
pt = app.pt
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
if pt.filled():
# If the table is filled, I assume that the table is ready
# to use. Thus install it into the database for persistency.
cell_list = []
for offset in xrange(app.num_partitions):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(),
cell.getState()))
app.dm.setPartitionTable(ptid, cell_list)
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
assert not row_list
self.app.has_partition_table = True
logging.info('Got the partition table :')
self.app.pt.log()
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo import protocol
from neo.storage.handlers.handler import BaseMasterHandler
from neo.protocol import INVALID_SERIAL, INVALID_TID, INVALID_PARTITION, \
TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE
from neo.util import dump
from neo.node import StorageNode
from neo.exception import PrimaryFailure, OperationFailure
class MasterOperationHandler(BaseMasterHandler):
""" This handler is used for the primary master """
def handleStopOperation(self, conn, packet):
raise OperationFailure('operation stopped')
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
self.app.replicator.setCriticalTID(packet, ltid)
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
self.app.replicator.setUnfinishedTIDList(tid_list)
def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app
t = app.dm.getTransaction(tid)
if t is None:
p = protocol.tidNotFound('%s does not exist' % dump(tid))
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
logging.info('ignoring older partition changes')
return
# First, change the table on memory.
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
if uuid == app.uuid:
# If this is for myself, this can affect replications.
if state == DISCARDED_STATE:
app.replicator.removePartition(offset)
elif state == OUT_OF_DATE_STATE:
app.replicator.addPartition(offset)
# Then, the database.
app.dm.changePartitionTable(ptid, cell_list)
logging.info('Partition table updated:')
self.app.pt.log()
def handleLockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
app.load_lock_dict[o[0]] = tid
app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError:
pass
conn.answer(protocol.notifyInformationLocked(tid), packet)
def handleUnlockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
del app.load_lock_dict[oid]
del app.store_lock_dict[oid]
app.dm.finishTransaction(tid)
del app.transaction_dict[tid]
# Now it may be possible to execute some events.
app.executeQueuedEvents()
except KeyError:
pass
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
import logging import logging
from neo.storage.handlers.handler import StorageEventHandler from neo.storage.handlers.handler import BaseStorageHandler
from neo import protocol from neo import protocol
class ReplicationEventHandler(StorageEventHandler): class ReplicationHandler(BaseStorageHandler):
"""This class handles events for replications.""" """This class handles events for replications."""
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo import protocol
from neo.storage.handlers.handler import BaseClientAndStorageOperationHandler
from neo.protocol import INVALID_SERIAL, INVALID_TID, INVALID_PARTITION, \
TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE
class StorageOperationHandler(BaseClientAndStorageOperationHandler):
def connectionCompleted(self, conn):
BaseClientAndStorageOperationHandler.connectionCompleted(self, conn)
def handleAskOIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return OIDs only
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app
if partition == INVALID_PARTITION:
# Collect all usable partitions for me.
getCellList = app.pt.getCellList
partition_list = []
for offset in xrange(app.num_partitions):
for cell in getCellList(offset, readable=True):
if cell.getUUID() == app.uuid:
partition_list.append(offset)
break
else:
partition_list = [partition]
oid_list = app.dm.getOIDList(first, last - first,
app.num_partitions, partition_list)
conn.answer(protocol.answerOIDs(oid_list), packet)
...@@ -17,49 +17,16 @@ ...@@ -17,49 +17,16 @@
import logging import logging
from neo.storage.handlers.handler import StorageEventHandler from neo.storage.handlers.handler import BaseMasterHandler
from neo.protocol import INVALID_OID, INVALID_TID, TEMPORARILY_DOWN_STATE from neo.protocol import INVALID_OID, INVALID_TID, TEMPORARILY_DOWN_STATE
from neo import protocol from neo import protocol
from neo.util import dump from neo.util import dump
from neo.node import StorageNode from neo.node import StorageNode
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import OperationFailure
class VerificationEventHandler(StorageEventHandler): class VerificationHandler(BaseMasterHandler):
"""This class deals with events for a verification phase.""" """This class deals with events for a verification phase."""
def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted."""
# I do not want to accept a connection at this phase, but
# someone might mistake me as a master node.
StorageEventHandler.connectionAccepted(self, conn, s, addr)
def timeoutExpired(self, conn):
# If a primary master node timeouts, I cannot continue.
logging.critical('the primary master node times out')
raise PrimaryFailure('the primary master node times out')
StorageEventHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn):
# If a primary master node closes, I cannot continue.
logging.critical('the primary master node is dead')
raise PrimaryFailure('the primary master node is dead')
StorageEventHandler.connectionClosed(self, conn)
def peerBroken(self, conn):
# If a primary master node gets broken, I cannot continue.
logging.critical('the primary master node is broken')
raise PrimaryFailure('the primary master node is broken')
StorageEventHandler.peerBroken(self, conn)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
app = self.app
if app.primary_master_node.getUUID() != primary_uuid:
raise PrimaryFailure('the primary master node seems to have changed')
# XXX is it better to deal with known_master_list here?
# But a primary master node is supposed not to send any info
# with this packet, so it would be useless.
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
app = self.app app = self.app
oid = app.dm.getLastOID() or INVALID_OID oid = app.dm.getLastOID() or INVALID_OID
...@@ -67,16 +34,6 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -67,16 +34,6 @@ class VerificationEventHandler(StorageEventHandler):
p = protocol.answerLastIDs(oid, tid, app.ptid) p = protocol.answerLastIDs(oid, tid, app.ptid)
conn.answer(p, packet) conn.answer(p, packet)
def handleAnswerNodeInformation(self, conn, packet, node_list):
assert not node_list
self.app.has_node_information = True
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
assert not row_list
self.app.has_partition_table = True
logging.info('Got the partition table :')
self.app.pt.log()
def handleAskPartitionTable(self, conn, packet, offset_list): def handleAskPartitionTable(self, conn, packet, offset_list):
app = self.app app = self.app
row_list = [] row_list = []
...@@ -95,37 +52,6 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -95,37 +52,6 @@ class VerificationEventHandler(StorageEventHandler):
p = protocol.answerPartitionTable(app.ptid, row_list) p = protocol.answerPartitionTable(app.ptid, row_list)
conn.answer(p, packet) conn.answer(p, packet)
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
"""A primary master node sends this packet to synchronize a partition
table. Note that the message can be split into multiple packets."""
app = self.app
nm = app.nm
pt = app.pt
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
if pt.filled():
# If the table is filled, I assume that the table is ready
# to use. Thus install it into the database for persistency.
cell_list = []
for offset in xrange(app.num_partitions):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(),
cell.getState()))
app.dm.setPartitionTable(ptid, cell_list)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
......
...@@ -18,8 +18,7 @@ ...@@ -18,8 +18,7 @@
import logging import logging
from random import choice from random import choice
from neo.storage.handlers.handler import StorageEventHandler from neo.storage import handlers
from neo.storage.handlers.replication import ReplicationEventHandler
from neo import protocol from neo import protocol
from neo.protocol import STORAGE_NODE_TYPE, UP_TO_DATE_STATE, \ from neo.protocol import STORAGE_NODE_TYPE, UP_TO_DATE_STATE, \
OUT_OF_DATE_STATE, INVALID_TID, RUNNING_STATE OUT_OF_DATE_STATE, INVALID_TID, RUNNING_STATE
...@@ -183,7 +182,7 @@ class Replicator(object): ...@@ -183,7 +182,7 @@ class Replicator(object):
self.current_connection = None self.current_connection = None
if self.current_connection is None: if self.current_connection is None:
handler = ReplicationEventHandler(app) handler = handlers.ReplicationEventHandler(app)
self.current_connection = ClientConnection(app.em, handler, self.current_connection = ClientConnection(app.em, handler,
addr = addr, addr = addr,
connector_handler = app.connector_handler) connector_handler = app.connector_handler)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment