Commit 07b48079 authored by Vincent Pelletier's avatar Vincent Pelletier

Ignore some requests, based on connection state.

Some requests can be safely ignored when received over a closed connection.
This was previously done explicitly in handlers, but it turns out it would
cause a lot of code duplication. Instead, define the policy on a packet
type basis, and apply it to all packets upon reception, before passing it
to handler.
Also, protect request handlers when they respond, as connection might be
closed.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2419 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 7cab0aa8
...@@ -162,6 +162,10 @@ class HandlerSwitcher(object): ...@@ -162,6 +162,10 @@ class HandlerSwitcher(object):
def _handle(self, connection, packet): def _handle(self, connection, packet):
assert len(self._pending) == 1 or self._pending[0][0] assert len(self._pending) == 1 or self._pending[0][0]
PACKET_LOGGER.dispatch(connection, packet, 'from') PACKET_LOGGER.dispatch(connection, packet, 'from')
if connection.isClosed() and packet.ignoreOnClosedConnection():
neo.logging.debug('Ignoring packet %r on closed connection %r',
packet, connection)
return
msg_id = packet.getId() msg_id = packet.getId()
(request_dict, handler) = self._pending[0] (request_dict, handler) = self._pending[0]
# notifications are not expected # notifications are not expected
......
...@@ -28,13 +28,6 @@ class MasterHandler(EventHandler): ...@@ -28,13 +28,6 @@ class MasterHandler(EventHandler):
neo.logging.error('Protocol error %s %s', message, conn.getAddress()) neo.logging.error('Protocol error %s %s', message, conn.getAddress())
def askPrimary(self, conn): def askPrimary(self, conn):
if conn.getConnector() is None:
# Connection can be closed by peer after he sent AskPrimary
# if he finds the primary master before we answer him.
# The connection gets closed before this message gets processed
# because this message might have been queued, but connection
# interruption takes effect as soon as received.
return
app = self.app app = self.app
if app.primary: if app.primary:
primary_uuid = app.uuid primary_uuid = app.uuid
......
...@@ -193,14 +193,6 @@ class ServerElectionHandler(MasterHandler): ...@@ -193,14 +193,6 @@ class ServerElectionHandler(MasterHandler):
def requestIdentification(self, conn, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
if conn.getConnector() is None:
# Connection can be closed by peer after he sent
# RequestIdentification if he finds the primary master before
# we answer him.
# The connection gets closed before this message gets processed
# because this message might have been queued, but connection
# interruption takes effect as soon as received.
return
self.checkClusterName(name) self.checkClusterName(name)
app = self.app app = self.app
if node_type != NodeTypes.MASTER: if node_type != NodeTypes.MASTER:
......
...@@ -140,10 +140,8 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -140,10 +140,8 @@ class StorageServiceHandler(BaseServiceHandler):
uid_set.remove(conn.getUUID()) uid_set.remove(conn.getUUID())
if not uid_set: if not uid_set:
app.packing = None app.packing = None
try: if not client.isClosed():
client.answer(Packets.AnswerPack(True), msg_id=msg_id) client.answer(Packets.AnswerPack(True), msg_id=msg_id)
except ConnectorConnectionClosedException:
pass
def notifyReady(self, conn): def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID()) self.app.setStorageReady(conn.getUUID())
......
...@@ -239,6 +239,7 @@ class Packet(object): ...@@ -239,6 +239,7 @@ class Packet(object):
a tuple respectively. a tuple respectively.
""" """
_ignore_when_closed = False
_header_format = None _header_format = None
_header_len = None _header_len = None
_request = None _request = None
...@@ -326,6 +327,13 @@ class Packet(object): ...@@ -326,6 +327,13 @@ class Packet(object):
def getAnswerClass(self): def getAnswerClass(self):
return self._answer return self._answer
def ignoreOnClosedConnection(self):
"""
Tells if this packet must be ignored when its connection is closed
when it is handled.
"""
return self._ignore_when_closed
class Notify(Packet): class Notify(Packet):
""" """
General purpose notification (remote logging) General purpose notification (remote logging)
...@@ -1718,7 +1726,7 @@ def initMessage(klass): ...@@ -1718,7 +1726,7 @@ def initMessage(klass):
klass._header_len = calcsize(klass._header_format) klass._header_len = calcsize(klass._header_format)
StaticRegistry = {} StaticRegistry = {}
def register(code, request, answer=None): def register(code, request, answer=None, ignore_when_closed=None):
""" Register a packet in the packet registry """ """ Register a packet in the packet registry """
# register the request # register the request
# assert code & RESPONSE_MASK == 0 # assert code & RESPONSE_MASK == 0
...@@ -1727,6 +1735,13 @@ def register(code, request, answer=None): ...@@ -1727,6 +1735,13 @@ def register(code, request, answer=None):
request._code = code request._code = code
request._answer = answer request._answer = answer
StaticRegistry[code] = request StaticRegistry[code] = request
if ignore_when_closed is None:
# By default, on a closed connection:
# - request: ignore
# - answer: keep
# - nofitication: keep
ignore_when_closed = answer is not None
request._ignore_when_closed = ignore_when_closed
if answer not in (None, Error): if answer not in (None, Error):
initMessage(answer) initMessage(answer)
# compute the answer code # compute the answer code
...@@ -1842,11 +1857,15 @@ class PacketRegistry(dict): ...@@ -1842,11 +1857,15 @@ class PacketRegistry(dict):
AskFinishTransaction, AnswerTransactionFinished = register( AskFinishTransaction, AnswerTransactionFinished = register(
0x0013, 0x0013,
AskFinishTransaction, AskFinishTransaction,
AnswerTransactionFinished) AnswerTransactionFinished,
ignore_when_closed=False,
)
AskLockInformation, AnswerInformationLocked = register( AskLockInformation, AnswerInformationLocked = register(
0x0014, 0x0014,
AskLockInformation, AskLockInformation,
AnswerInformationLocked) AnswerInformationLocked,
ignore_when_closed=False,
)
InvalidateObjects = register(0x0015, InvalidateObjects) InvalidateObjects = register(0x0015, InvalidateObjects)
NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation) NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation)
AskNewOIDs, AnswerNewOIDs = register( AskNewOIDs, AnswerNewOIDs = register(
...@@ -1889,11 +1908,15 @@ class PacketRegistry(dict): ...@@ -1889,11 +1908,15 @@ class PacketRegistry(dict):
SetNodeState = register( SetNodeState = register(
0x0023, 0x0023,
SetNodeState, SetNodeState,
Error) Error,
ignore_when_closed=False,
)
AddPendingNodes = register( AddPendingNodes = register(
0x0024, 0x0024,
AddPendingNodes, AddPendingNodes,
Error) Error,
ignore_when_closed=False,
)
AskNodeInformation, AnswerNodeInformation = register( AskNodeInformation, AnswerNodeInformation = register(
0x0025, 0x0025,
AskNodeInformation, AskNodeInformation,
...@@ -1901,7 +1924,9 @@ class PacketRegistry(dict): ...@@ -1901,7 +1924,9 @@ class PacketRegistry(dict):
SetClusterState = register( SetClusterState = register(
0x0026, 0x0026,
SetClusterState, SetClusterState,
Error) Error,
ignore_when_closed=False,
)
NotifyClusterInformation = register(0x0027, NotifyClusterInformation) NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
AskClusterState, AnswerClusterState = register( AskClusterState, AnswerClusterState = register(
0x0028, 0x0028,
...@@ -1932,7 +1957,9 @@ class PacketRegistry(dict): ...@@ -1932,7 +1957,9 @@ class PacketRegistry(dict):
AskPack, AnswerPack = register( AskPack, AnswerPack = register(
0x0038, 0x0038,
AskPack, AskPack,
AnswerPack) AnswerPack,
ignore_when_closed=False,
)
AskCheckTIDRange, AnswerCheckTIDRange = register( AskCheckTIDRange, AnswerCheckTIDRange = register(
0x0039, 0x0039,
AskCheckTIDRange, AskCheckTIDRange,
......
...@@ -57,6 +57,7 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -57,6 +57,7 @@ class MasterOperationHandler(BaseMasterHandler):
if not tid in self.app.tm: if not tid in self.app.tm:
raise ProtocolError('Unknown transaction') raise ProtocolError('Unknown transaction')
self.app.tm.lock(tid, oid_list) self.app.tm.lock(tid, oid_list)
if not conn.isClosed():
conn.answer(Packets.AnswerInformationLocked(tid)) conn.answer(Packets.AnswerInformationLocked(tid))
def notifyUnlockInformation(self, conn, tid): def notifyUnlockInformation(self, conn, tid):
...@@ -70,5 +71,6 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -70,5 +71,6 @@ class MasterOperationHandler(BaseMasterHandler):
neo.logging.info('Pack started, up to %s...', dump(tid)) neo.logging.info('Pack started, up to %s...', dump(tid))
app.dm.pack(tid, app.tm.updateObjectDataForPack) app.dm.pack(tid, app.tm.updateObjectDataForPack)
neo.logging.info('Pack finished.') neo.logging.info('Pack finished.')
if not conn.isClosed():
conn.answer(Packets.AnswerPack(True)) conn.answer(Packets.AnswerPack(True))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment