Commit f52899a9 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Check if a MT connection is closed in ask()

This solve a race condition where the client's thread poll triggers
connection(Lost|Failed) before a call to ask(). In that case a answer was
registered as expected and never cancelled, which lead to a frozen client.

- Split _connectToPrimaryNode
- Move ConnectionClosed at generic level

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2357 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 3bcf53d2
...@@ -34,11 +34,11 @@ from neo.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID ...@@ -34,11 +34,11 @@ from neo.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID
from neo.event import EventManager from neo.event import EventManager
from neo.util import makeChecksum as real_makeChecksum, dump from neo.util import makeChecksum as real_makeChecksum, dump
from neo.locking import Lock from neo.locking import Lock
from neo.connection import MTClientConnection, OnTimeout from neo.connection import MTClientConnection, OnTimeout, ConnectionClosed
from neo.node import NodeManager from neo.node import NodeManager
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed from neo.client.exception import NEOStorageNotFoundError
from neo.exception import NeoException from neo.exception import NeoException
from neo.client.handlers import storage, master from neo.client.handlers import storage, master
from neo.dispatcher import Dispatcher, ForgottenPacket from neo.dispatcher import Dispatcher, ForgottenPacket
...@@ -314,6 +314,9 @@ class Application(object): ...@@ -314,6 +314,9 @@ class Application(object):
@profiler_decorator @profiler_decorator
def _connectToPrimaryNode(self): def _connectToPrimaryNode(self):
"""
Lookup for the current primary master node
"""
logging.debug('connecting to primary master...') logging.debug('connecting to primary master...')
ready = False ready = False
nm = self.nm nm = self.nm
...@@ -349,8 +352,8 @@ class Application(object): ...@@ -349,8 +352,8 @@ class Application(object):
logging.error('Connection to master node %s failed', logging.error('Connection to master node %s failed',
self.trying_master_node) self.trying_master_node)
continue continue
msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
try: try:
msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
self._waitMessage(conn, msg_id, self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler) handler=self.primary_bootstrap_handler)
except ConnectionClosed: except ConnectionClosed:
...@@ -358,40 +361,44 @@ class Application(object): ...@@ -358,40 +361,44 @@ class Application(object):
# If we reached the primary master node, mark as connected # If we reached the primary master node, mark as connected
connected = self.primary_master_node is not None and \ connected = self.primary_master_node is not None and \
self.primary_master_node is self.trying_master_node self.primary_master_node is self.trying_master_node
logging.info('Connected to %s' % (self.primary_master_node, ))
logging.info('connected to a primary master node') try:
# Identify to primary master and request initial data ready = self.identifyToPrimaryNode(conn)
while conn.getUUID() is None: except ConnectionClosed:
if conn.getConnector() is None: logging.error('Connection to %s lost', self.trying_master_node)
logging.error('Connection to master node %s lost', self.primary_master_node = None
self.trying_master_node) continue
self.primary_master_node = None logging.info("Connected and ready")
break
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name)
msg_id = conn.ask(p, queue=queue)
try:
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
except ConnectionClosed:
self.primary_master_node = None
break
if conn.getUUID() is None:
# Node identification was refused by master.
time.sleep(1)
if self.uuid is not None:
msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
ready = self.uuid is not None and self.pt is not None \
and self.pt.operational()
logging.info("connected to primary master node %s" %
self.primary_master_node)
return conn return conn
def identifyToPrimaryNode(self, conn):
"""
Request identification and required informations to be operational.
Might raise ConnectionClosed so that the new primary can be
looked-up again.
"""
logging.info('Initializing from master')
queue = self.local_var.queue
# Identify to primary master and request initial data
while conn.getUUID() is None:
p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid,
None, self.name)
self._waitMessage(conn, conn.ask(p, queue=queue),
handler=self.primary_bootstrap_handler)
if conn.getUUID() is None:
# Node identification was refused by master, it is considered
# as the primary as long as we are connected to it.
time.sleep(1)
if self.uuid is not None:
msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
self._waitMessage(conn, msg_id,
handler=self.primary_bootstrap_handler)
return self.uuid is not None and self.pt is not None \
and self.pt.operational()
def registerDB(self, db, limit): def registerDB(self, db, limit):
self._db = db self._db = db
......
...@@ -17,9 +17,6 @@ ...@@ -17,9 +17,6 @@
from ZODB import POSException from ZODB import POSException
class ConnectionClosed(Exception):
pass
class NEOStorageError(POSException.StorageError): class NEOStorageError(POSException.StorageError):
pass pass
......
...@@ -19,8 +19,7 @@ ...@@ -19,8 +19,7 @@
from neo import logging from neo import logging
from neo.locking import RLock from neo.locking import RLock
from neo.protocol import NodeTypes, Packets from neo.protocol import NodeTypes, Packets
from neo.connection import MTClientConnection from neo.connection import MTClientConnection, ConnectionClosed
from neo.client.exception import ConnectionClosed
from neo.profiling import profiler_decorator from neo.profiling import profiler_decorator
import time import time
......
...@@ -36,6 +36,9 @@ PING_TIMEOUT = 5 ...@@ -36,6 +36,9 @@ PING_TIMEOUT = 5
INCOMING_TIMEOUT = 10 INCOMING_TIMEOUT = 10
CRITICAL_TIMEOUT = 30 CRITICAL_TIMEOUT = 30
class ConnectionClosed(Exception):
pass
def not_closed(func): def not_closed(func):
def decorator(self, *args, **kw): def decorator(self, *args, **kw):
if self.connector is None: if self.connector is None:
...@@ -734,6 +737,8 @@ class MTClientConnection(ClientConnection): ...@@ -734,6 +737,8 @@ class MTClientConnection(ClientConnection):
queue=None): queue=None):
self.lock() self.lock()
try: try:
if self.isClosed():
raise ConnectionClosed
# XXX: Here, we duplicate Connection.ask because we need to call # XXX: Here, we duplicate Connection.ask because we need to call
# self.dispatcher.register after setId is called and before # self.dispatcher.register after setId is called and before
# _addPacket is called. # _addPacket is called.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment