Commit 19745e7c authored by Julien Muchembled's avatar Julien Muchembled

Small optimizations & cleanups

parent 5b69d553
......@@ -25,7 +25,7 @@
becomes true, without waking up needlessly in the meantime) or null
(don't wait at all).
- Implements delayed connection acceptation.
Currently, any node that connects to early to another that is busy for
Currently, any node that connects too early to another that is busy for
some reasons is immediately rejected with the 'not ready' error code. This
should be replaced by a queue in the listening node that keep a pool a
nodes that will be accepted late, when the conditions will be satisfied.
......
......@@ -27,8 +27,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
""" Bootstrap handler used when looking for the primary master """
def notReady(self, conn, message):
app = self.app
app.trying_master_node = None
self.app.trying_master_node = None
def _acceptIdentification(self, node, uuid, num_partitions,
num_replicas, your_uuid, primary, known_master_list):
......
......@@ -63,15 +63,12 @@ class ConnectionPool(object):
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
logging.error('Connection to %r failed', node)
self.notifyFailure(node)
conn = None
except NodeNotReady:
logging.info('%r not ready', node)
self.notifyFailure(node)
conn = None
else:
logging.info('Connected %r', node)
return conn
self.notifyFailure(node)
def _dropConnections(self):
"""Drop connections."""
......
......@@ -283,12 +283,11 @@ class BaseConnection(object):
def _getReprInfo(self):
return [
('uuid', uuid_str(self.getUUID())),
('address', self.addr and '%s:%d' % self.addr or '?'),
('address', '%s:%u' % self.addr if self.addr else '?'),
('handler', self.getHandler()),
], ['closed'] if self.isClosed() else []
def __repr__(self):
address = self.addr and '%s:%d' % self.addr or '?'
r, flags = self._getReprInfo()
r = map('%s=%s'.__mod__, r)
r += flags
......@@ -306,9 +305,6 @@ class BaseConnection(object):
else:
logging.debug('Delay handler %r on %r', handler, self)
def getEventManager(self):
return self.em
def getUUID(self):
return None
......@@ -357,7 +353,7 @@ class ListeningConnection(BaseConnection):
new_s, addr = self.connector.getNewConnection()
logging.debug('accepted a connection from %s:%d', *addr)
handler = self.getHandler()
new_conn = ServerConnection(self.getEventManager(), handler,
new_conn = ServerConnection(self.em, handler,
connector=new_s, addr=addr)
handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
......@@ -366,9 +362,6 @@ class ListeningConnection(BaseConnection):
def getAddress(self):
return self.connector.getAddress()
def writable(self):
return False
def isListening(self):
return True
......@@ -466,8 +459,7 @@ class Connection(BaseConnection):
def checkTimeout(self, t):
# first make sure we don't timeout on answers we already received
if self._base_timeout and not self._queue:
timeout = t - self._base_timeout
if self._timeout <= timeout:
if self._timeout <= t - self._base_timeout:
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.timeout(self)
......@@ -521,15 +513,14 @@ class Connection(BaseConnection):
"""
Returns True if there are messages queued and awaiting processing.
"""
return len(self._queue) != 0
return not not self._queue
def process(self):
"""
Process a pending packet.
"""
# check out packet and process it with current handler
packet = self._queue.pop(0)
self._handlers.handle(self, packet)
self._handlers.handle(self, self._queue.pop(0))
self.updateTimeout()
def pending(self):
......@@ -665,7 +656,7 @@ class Connection(BaseConnection):
packet.setId(msg_id)
self._addPacket(packet)
handlers = self._handlers
t = not handlers.isPending() and time() or None
t = None if handlers.isPending() else time()
handlers.emit(packet, timeout, on_timeout, kw)
self.updateTimeout(t)
return msg_id
......@@ -784,7 +775,7 @@ class MTClientConnection(ClientConnection):
self.dispatcher.register(self, msg_id, queue)
self._addPacket(packet)
handlers = self._handlers
t = not handlers.isPending() and time() or None
t = None if handlers.isPending() else time()
handlers.emit(packet, timeout, on_timeout, kw)
self.updateTimeout(t)
return msg_id
......
......@@ -88,9 +88,6 @@ class SocketConnector:
def getError(self):
return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
def getAddress(self):
raise NotImplementedError
def getDescriptor(self):
# this descriptor must only be used by the event manager, where it
# guarantee unicity only while the connector is opened and registered
......@@ -142,31 +139,24 @@ class SocketConnector:
return self.socket.close()
def __repr__(self):
if self.is_closed:
fileno = '?'
else:
fileno = self.socket_fd
result = '<%s at 0x%x fileno %s %s, ' % (self.__class__.__name__,
id(self), fileno, self.socket.getsockname())
if self.is_closed is None:
result += 'never opened'
state = 'never opened'
else:
if self.is_closed:
result += 'closed '
state = 'closed '
else:
result += 'opened '
state = 'opened '
if self.is_listening:
result += 'listening'
state += 'listening'
else:
if self.accepted_from is None:
result += 'to'
state += 'to '
else:
result += 'from'
result += ' %s' % (self.remote_addr, )
return result + '>'
def _accept(self):
raise NotImplementedError
state += 'from '
state += str(self.remote_addr)
return '<%s at 0x%x fileno %s %s, %s>' % (self.__class__.__name__,
id(self), '?' if self.is_closed else self.socket_fd,
self.getAddress(), state)
class SocketConnectorIPv4(SocketConnector):
" Wrapper for IPv4 sockets"
......
......@@ -48,11 +48,9 @@ class NeoCTL(object):
if not self.connected:
self.connection = ClientConnection(self.em, self.handler,
node=self.server, connector=self.connector_handler())
while self.connection is not None:
if self.connected:
break
while not self.connected:
self.em.poll(1)
else:
if self.connection is None:
raise NotReadyException('not connected')
return self.connection
......
......@@ -346,27 +346,25 @@ class Application(object):
def queueEvent(self, some_callable, conn=None, args=(), key=None,
raise_on_duplicate=True):
msg_id = None if conn is None else conn.getPeerId()
event_queue_dict = self.event_queue_dict
if raise_on_duplicate and key in event_queue_dict:
n = event_queue_dict.get(key)
if n and raise_on_duplicate:
raise AlreadyPendingError()
else:
msg_id = None if conn is None else conn.getPeerId()
self.event_queue.append((key, some_callable, msg_id, conn, args))
if key is not None:
try:
event_queue_dict[key] += 1
except KeyError:
event_queue_dict[key] = 1
event_queue_dict[key] = n + 1 if n else 1
def executeQueuedEvents(self):
l = len(self.event_queue)
p = self.event_queue.popleft
event_queue_dict = self.event_queue_dict
for _ in xrange(l):
for _ in xrange(len(self.event_queue)):
key, some_callable, msg_id, conn, args = p()
if key is not None:
event_queue_dict[key] -= 1
if event_queue_dict[key] == 0:
n = event_queue_dict[key] - 1
if n:
event_queue_dict[key] = n
else:
del event_queue_dict[key]
if conn is None:
some_callable(*args)
......
......@@ -628,7 +628,7 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionCompleted(1)
self._checkConnectionFailed(0)
# check call to event manager
self.assertFalse(bc.getEventManager() is None)
self.assertIsNot(bc.em, None)
self._checkReaderAdded(1)
self._checkWriterAdded(0)
......@@ -652,7 +652,7 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionCompleted(0)
self._checkConnectionFailed(0)
# check call to event manager
self.assertFalse(bc.getEventManager() is None)
self.assertIsNot(bc.em, None)
self._checkReaderAdded(1)
self._checkWriterAdded(1)
......
......@@ -183,7 +183,6 @@ class ServerNode(Node):
class __metaclass__(type):
def __init__(cls, name, bases, d):
type.__init__(cls, name, bases, d)
if Node not in bases and threading.Thread not in cls.__mro__:
cls.__bases__ = bases + (threading.Thread,)
cls.node_type = getattr(NodeTypes, name[:-11].upper())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment