Commit ca42b3ad authored by Tim Peters's avatar Tim Peters

Port from ZODB 3.2.

Fixed several thread and asyncore races in ZEO's connection dance.

ZEO/tests/ConnectionTests.py
    The pollUp() and pollDown() methods were pure busy loops whenever
    the asyncore socket map was empty, and at least on some flavors of
    Linux that starved the other thread(s) trying to do real work.
    This grossly increased the time needed to run tests using these, and
    sometimes caused bogus "timed out" test failures.

ZEO/zrpc/client.py
ZEO/zrpc/connection.py
    Renamed class ManagedConnection to ManagedClientConnection, for clarity.

    Moved the comment block about protocol negotiation from the guts of
    ManagedClientConnection to before the Connection base class -- the
    Connection constructor can't be understood without this context.  Added
    more words about the delicate protocol negotiation dance.

    Connection class:  made this an abstract base clase.  Derived classes
    _must_ implement the handshake() method.  There was really nothing in
    common between server and client wrt what handshake() needs to do, and
    it was confusing for one of them to use the base class handshake() while
    the other replaced handshake() completely.

    Connection.__init__:  It isn't safe to register with asyncore's socket
    map before special-casing for the first (protocol handshake) message is
    set up.  Repaired that.  Also removed the pointless "optionalness" of
    the optional arguments.

    ManagedClientConnection.__init__:  Added machinery to set up correct
    (thread-safe) message queueing.  There was an unrepairable hole before,
    in the transition between "I'm queueing msgs waiting for the server
    handshake" and "I'm done queueing messages":  it was impossible to know
    whether any calls to the client's "queue a message" method were in
    progress (in other threads), so impossible to make the transition safely
    in all cases.  The client had to grow its own message_output() method,
    with a mutex protecting the transition from thread races.

    Changed zrpc-conn log messages to include "(S)" for server-side or
    "(C)" for client-side.  This is especially helpful for figuring out
    logs produced while running the test suite (the server and client
    log messages end up in the same file then).
parent 5f60327d
What's new in ZODB3 3.3.1a2?
============================
Release date: DD-MMM-2005
ZEO
---
Repaired subtle race conditions in establishing ZEO connections, both client-
and server-side. These account for intermittent cases where ZEO failed
to make a connection (or reconnection), accompanied by a log message showing
an error caught in ``asyncore`` and having a traceback ending with:
``UnpicklingError: invalid load key, 'Z'.``
or:
``ZRPCError: bad handshake '(K\x00K\x00U\x0fgetAuthProtocol)t.'``
or:
``error: (9, 'Bad file descriptor')``
or an ``AttributeError``.
These were exacerbated when running the test suite, because of an unintended
busy loop in the test scaffolding, which could starve the thread trying to
make a connection. The ZEO reconnection tests may run much faster now,
depending on platform, and should suffer far fewer (if any) intermittent
"timed out waiting for storage to connect" failures.
What's new in ZODB3 3.3.1a1? What's new in ZODB3 3.3.1a1?
============================ ============================
Release date: 11-Jan-2005 Release date: 11-Jan-2005
......
...@@ -216,7 +216,7 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -216,7 +216,7 @@ class CommonSetupTearDown(StorageTestBase):
def pollUp(self, timeout=30.0, storage=None): def pollUp(self, timeout=30.0, storage=None):
if storage is None: if storage is None:
storage = self._storage storage = self._storage
# Poll until we're connected # Poll until we're connected.
now = time.time() now = time.time()
giveup = now + timeout giveup = now + timeout
while not storage.is_connected(): while not storage.is_connected():
...@@ -224,9 +224,15 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -224,9 +224,15 @@ class CommonSetupTearDown(StorageTestBase):
now = time.time() now = time.time()
if now > giveup: if now > giveup:
self.fail("timed out waiting for storage to connect") self.fail("timed out waiting for storage to connect")
# When the socket map is empty, poll() returns immediately,
# and this is a pure busy-loop then. At least on some Linux
# flavors, that can starve the thread trying to connect,
# leading to grossly increased runtime (typical) or bogus
# "timed out" failures. A little sleep here cures both.
time.sleep(0.1)
def pollDown(self, timeout=30.0): def pollDown(self, timeout=30.0):
# Poll until we're disconnected # Poll until we're disconnected.
now = time.time() now = time.time()
giveup = now + timeout giveup = now + timeout
while self._storage.is_connected(): while self._storage.is_connected():
...@@ -234,6 +240,8 @@ class CommonSetupTearDown(StorageTestBase): ...@@ -234,6 +240,8 @@ class CommonSetupTearDown(StorageTestBase):
now = time.time() now = time.time()
if now > giveup: if now > giveup:
self.fail("timed out waiting for storage to disconnect") self.fail("timed out waiting for storage to disconnect")
# See pollUp() for why we sleep a little here.
time.sleep(0.1)
class ConnectionTests(CommonSetupTearDown): class ConnectionTests(CommonSetupTearDown):
......
...@@ -27,7 +27,7 @@ from ZODB.loglevels import BLATHER ...@@ -27,7 +27,7 @@ from ZODB.loglevels import BLATHER
from ZEO.zrpc.log import log from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection from ZEO.zrpc.connection import ManagedClientConnection
class ConnectionManager(object): class ConnectionManager(object):
"""Keeps a connection up over time""" """Keeps a connection up over time"""
...@@ -476,8 +476,8 @@ class ConnectWrapper: ...@@ -476,8 +476,8 @@ class ConnectWrapper:
Call the client's testConnection(), giving the client a chance Call the client's testConnection(), giving the client a chance
to do app-level check of the connection. to do app-level check of the connection.
""" """
self.conn = ManagedConnection(self.sock, self.addr, self.conn = ManagedClientConnection(self.sock, self.addr,
self.client, self.mgr) self.client, self.mgr)
self.sock = None # The socket is now owned by the connection self.sock = None # The socket is now owned by the connection
try: try:
self.preferred = self.client.testConnection(self.conn) self.preferred = self.client.testConnection(self.conn)
......
...@@ -67,6 +67,64 @@ class MTDelay(Delay): ...@@ -67,6 +67,64 @@ class MTDelay(Delay):
self.ready.wait() self.ready.wait()
Delay.error(self, exc_info) Delay.error(self, exc_info)
# PROTOCOL NEGOTIATION
#
# The code implementing protocol version 2.0.0 (which is deployed
# in the field and cannot be changed) *only* talks to peers that
# send a handshake indicating protocol version 2.0.0. In that
# version, both the client and the server immediately send out
# their protocol handshake when a connection is established,
# without waiting for their peer, and disconnect when a different
# handshake is receive.
#
# The new protocol uses this to enable new clients to talk to
# 2.0.0 servers. In the new protocol:
#
# The server sends its protocol handshake to the client at once.
#
# The client waits until it receives the server's protocol handshake
# before sending its own handshake. The client sends the lower of its
# own protocol version and the server protocol version, allowing it to
# talk to servers using later protocol versions (2.0.2 and higher) as
# well: the effective protocol used will be the lower of the client
# and server protocol.
#
# [Ugly details: In order to treat the first received message (protocol
# handshake) differently than all later messages, both client and server
# start by patching their message_input() method to refer to their
# recv_handshake() method instead. In addition, the client has to arrange
# to queue (delay) outgoing messages until it receives the server's
# handshake, so that the first message the client sends to the server is
# the client's handshake. This multiply-special treatment of the first
# message is delicate, and several asyncore and thread subtleties were
# handled unsafely before ZODB 3.2.6.
# ]
#
# The ZEO modules ClientStorage and ServerStub have backwards
# compatibility code for dealing with the previous version of the
# protocol. The client accepts the old version of some messages,
# and will not send new messages when talking to an old server.
#
# As long as the client hasn't sent its handshake, it can't send
# anything else; output messages are queued during this time.
# (Output can happen because the connection testing machinery can
# start sending requests before the handshake is received.)
#
# UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
#
# Because a new client can talk to an old server, but not vice
# versa, all clients should be upgraded before upgrading any
# servers. Protocol upgrades beyond 2.0.1 will not have this
# restriction, because clients using protocol 2.0.1 or later can
# talk to both older and newer servers.
#
# No compatibility with protocol version 1 is provided.
# Connection is abstract (it must be derived from). ManagedServerConnection
# and ManagedClientConnection are the concrete subclasses. They need to
# supply a handshake() method appropriate for their role in protocol
# negotiation.
class Connection(smac.SizedMessageAsyncConnection, object): class Connection(smac.SizedMessageAsyncConnection, object):
"""Dispatcher for RPC on object on both sides of socket. """Dispatcher for RPC on object on both sides of socket.
...@@ -136,18 +194,33 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -136,18 +194,33 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# getExtensionMethods(). # getExtensionMethods().
# getInvalidations(). # getInvalidations().
def __init__(self, sock, addr, obj=None): # Client constructor passes 'C' for tag, server constructor 'S'. This
# is used in log messages.
def __init__(self, sock, addr, obj, tag):
self.obj = None self.obj = None
self.marshal = Marshaller() self.marshal = Marshaller()
self.closed = False self.closed = False
self.msgid = 0 self.peer_protocol_version = None # set in recv_handshake()
self.peer_protocol_version = None # Set in recv_handshake()
self.logger = logging.getLogger('ZEO.zrpc.Connection') assert tag in "CS"
self.logger = logging.getLogger('ZEO.zrpc.Connection(%c)' % tag)
if isinstance(addr, types.TupleType): if isinstance(addr, types.TupleType):
self.log_label = "(%s:%d) " % addr self.log_label = "(%s:%d) " % addr
else: else:
self.log_label = "(%s) " % addr self.log_label = "(%s) " % addr
self.__super_init(sock, addr)
# Supply our own socket map, so that we don't get registered with
# the asyncore socket map just yet. The initial protocol messages
# are treated very specially, and we dare not get invoked by asyncore
# before that special-case setup is complete. Some of that setup
# occurs near the end of this constructor, and the rest is done by
# a concrete subclass's handshake() method. Unfortunately, because
# we ultimately derive from asyncore.dispatcher, it's not possible
# to invoke the superclass constructor without asyncore stuffing
# us into _some_ socket map.
ourmap = {}
self.__super_init(sock, addr, map=ourmap)
# A Connection either uses asyncore directly or relies on an # A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If # asyncore mainloop running in a separate thread. If
# thr_async is true, then the mainloop is running in a # thr_async is true, then the mainloop is running in a
...@@ -157,24 +230,43 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -157,24 +230,43 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.thr_async = False self.thr_async = False
self.trigger = None self.trigger = None
self._prepare_async() self._prepare_async()
# The singleton dict is used in synchronous mode when a method # The singleton dict is used in synchronous mode when a method
# needs to call into asyncore to try to force some I/O to occur. # needs to call into asyncore to try to force some I/O to occur.
# The singleton dict is a socket map containing only this object. # The singleton dict is a socket map containing only this object.
self._singleton = {self._fileno: self} self._singleton = {self._fileno: self}
# msgid_lock guards access to msgid # msgid_lock guards access to msgid
self.msgid = 0
self.msgid_lock = threading.Lock() self.msgid_lock = threading.Lock()
# replies_cond is used to block when a synchronous call is # replies_cond is used to block when a synchronous call is
# waiting for a response # waiting for a response
self.replies_cond = threading.Condition() self.replies_cond = threading.Condition()
self.replies = {} self.replies = {}
# waiting_for_reply is used internally to indicate whether # waiting_for_reply is used internally to indicate whether
# a call is in progress. setting a session key is deferred # a call is in progress. setting a session key is deferred
# until after the call returns. # until after the call returns.
self.waiting_for_reply = False self.waiting_for_reply = False
self.delay_sesskey = None self.delay_sesskey = None
self.register_object(obj) self.register_object(obj)
# The first message we see is a protocol handshake. message_input()
# is temporarily replaced by recv_handshake() to treat that message
# specially. revc_handshake() does "del self.message_input", which
# uncovers the normal message_input() method thereafter.
self.message_input = self.recv_handshake
# Server and client need to do different things for protocol
# negotiation, and handshake() is implemented differently in each.
self.handshake() self.handshake()
# Now it's safe to register with asyncore's socket map; it was not
# safe before message_input was replaced, or before handshake() was
# invoked.
asyncore.socket_map.update(ourmap)
def __repr__(self): def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr) return "<%s %s>" % (self.__class__.__name__, self.addr)
...@@ -192,7 +284,7 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -192,7 +284,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.__super_close() self.__super_close()
def close_trigger(self): def close_trigger(self):
# Overridden by ManagedConnection # Overridden by ManagedClientConnection.
if self.trigger is not None: if self.trigger is not None:
self.trigger.close() self.trigger.close()
...@@ -200,24 +292,26 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -200,24 +292,26 @@ class Connection(smac.SizedMessageAsyncConnection, object):
"""Register obj as the true object to invoke methods on.""" """Register obj as the true object to invoke methods on."""
self.obj = obj self.obj = obj
def handshake(self, proto=None): # Subclass must implement. handshake() is called by the constructor,
# Overridden by ManagedConnection # near its end, but before self is added to asyncore's socket map.
# When a connection is created the first message sent is a 4-byte
# When a connection is created the first message sent is a # protocol version. This allows the protocol to evolve over time, and
# 4-byte protocol version. This mechanism should allow the # lets servers handle clients using multiple versions of the protocol.
# protocol to evolve over time, and let servers handle clients # In general, the server's handshake() just needs to send the server's
# using multiple versions of the protocol. # preferred protocol; the client's also needs to queue (delay) outgoing
# messages until it sees the handshake from the server.
# The mechanism replaces the message_input() method for the def handshake(self):
# first message received. raise NotImplementedError
# The client sends the protocol version it is using. # Replaces message_input() for the first message received. Records the
self.message_input = self.recv_handshake # protocol sent by the peer in `peer_protocol_version`, restores the
self.message_output(proto or self.protocol_version) # normal message_input() method, and raises an exception if the peer's
# protocol is unacceptable. That's all the server needs to do. The
# client needs to do additional work in response to the server's
# handshake, and extends this method.
def recv_handshake(self, proto): def recv_handshake(self, proto):
# Extended by ManagedConnection # Extended by ManagedClientConnection.
del self.message_input del self.message_input # uncover normal-case message_input()
self.peer_protocol_version = proto self.peer_protocol_version = proto
if self.oldest_protocol_version <= proto <= self.protocol_version: if self.oldest_protocol_version <= proto <= self.protocol_version:
self.log("received handshake %r" % proto, level=logging.INFO) self.log("received handshake %r" % proto, level=logging.INFO)
...@@ -227,7 +321,7 @@ class Connection(smac.SizedMessageAsyncConnection, object): ...@@ -227,7 +321,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
raise ZRPCError("bad handshake %r" % proto) raise ZRPCError("bad handshake %r" % proto)
def message_input(self, message): def message_input(self, message):
"""Decoding an incoming message and dispatch it""" """Decode an incoming message and dispatch it"""
# If something goes wrong during decoding, the marshaller # If something goes wrong during decoding, the marshaller
# will raise an exception. The exception will ultimately # will raise an exception. The exception will ultimately
# result in asycnore calling handle_error(), which will # result in asycnore calling handle_error(), which will
...@@ -563,81 +657,85 @@ class ManagedServerConnection(Connection): ...@@ -563,81 +657,85 @@ class ManagedServerConnection(Connection):
def __init__(self, sock, addr, obj, mgr): def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr self.mgr = mgr
self.__super_init(sock, addr, obj) self.__super_init(sock, addr, obj, 'S')
self.obj.notifyConnected(self) self.obj.notifyConnected(self)
def handshake(self):
# Send the server's preferred protocol to the client.
self.message_output(self.protocol_version)
def close(self): def close(self):
self.obj.notifyDisconnected() self.obj.notifyDisconnected()
self.mgr.close_conn(self) self.mgr.close_conn(self)
self.__super_close() self.__super_close()
class ManagedConnection(Connection): class ManagedClientConnection(Connection):
"""Client-side Connection subclass.""" """Client-side Connection subclass."""
__super_init = Connection.__init__ __super_init = Connection.__init__
__super_close = Connection.close __super_close = Connection.close
base_message_output = Connection.message_output
def __init__(self, sock, addr, obj, mgr): def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr self.mgr = mgr
self.__super_init(sock, addr, obj)
# We can't use the base smac's message_output directly because the
# client needs to queue outgoing messages until it's seen the
# initial protocol handshake from the server. So we have our own
# message_ouput() method, and support for initial queueing. This is
# a delicate design, requiring an output mutex to be wholly
# thread-safe.
# Caution: we must set this up before calling the base class
# constructor, because the latter registers us with asyncore;
# we need to guarantee that we'll queue outgoing messages before
# asyncore learns about us.
self.output_lock = threading.Lock()
self.queue_output = True
self.queued_messages = []
self.__super_init(sock, addr, obj, tag='C')
self.check_mgr_async() self.check_mgr_async()
# PROTOCOL NEGOTIATION: # Our message_ouput() queues messages until recv_handshake() gets the
# # protocol handshake from the server.
# The code implementing protocol version 2.0.0 (which is deployed def message_output(self, message):
# in the field and cannot be changed) *only* talks to peers that self.output_lock.acquire()
# send a handshake indicating protocol version 2.0.0. In that try:
# version, both the client and the server immediately send out if self.queue_output:
# their protocol handshake when a connection is established, self.queued_messages.append(message)
# without waiting for their peer, and disconnect when a different else:
# handshake is receive. assert not self.queued_messages
# self.base_message_output(message)
# The new protocol uses this to enable new clients to talk to finally:
# 2.0.0 servers: in the new protocol, the client waits until it self.output_lock.release()
# receives the server's protocol handshake before sending its own
# handshake. The client sends the lower of its own protocol
# version and the server protocol version, allowing it to talk to
# servers using later protocol versions (2.0.2 and higher) as
# well: the effective protocol used will be the lower of the
# client and server protocol.
#
# The ZEO modules ClientStorage and ServerStub have backwards
# compatibility code for dealing with the previous version of the
# protocol. The client accept the old version of some messages,
# and will not send new messages when talking to an old server.
#
# As long as the client hasn't sent its handshake, it can't send
# anything else; output messages are queued during this time.
# (Output can happen because the connection testing machinery can
# start sending requests before the handshake is received.)
#
# UPGRADING FROM ZEO 2.0.0 TO NEWER VERSIONS:
#
# Because a new client can talk to an old server, but not vice
# versa, all clients should be upgraded before upgrading any
# servers. Protocol upgrades beyond 2.0.1 will not have this
# restriction, because clients using protocol 2.0.1 or later can
# talk to both older and newer servers.
#
# No compatibility with protocol version 1 is provided.
def handshake(self): def handshake(self):
self.message_input = self.recv_handshake # The client waits to see the server's handshake. Outgoing messages
self.message_output = self.queue_output # are queued for the duration. The client will send its own
self.output_queue = [] # handshake after the server's handshake is seen, in recv_handshake()
# The handshake is sent by recv_handshake() below # below. It will then send any messages queued while waiting.
assert self.queue_output # the constructor already set this
def queue_output(self, message):
self.output_queue.append(message)
def recv_handshake(self, proto): def recv_handshake(self, proto):
del self.message_output # The protocol to use is the older of our and the server's preferred
# protocols.
proto = min(proto, self.protocol_version) proto = min(proto, self.protocol_version)
Connection.recv_handshake(self, proto) # Raise error if wrong proto
self.message_output(proto) # Restore the normal message_input method, and raise an exception
queue = self.output_queue # if the protocol version is too old.
del self.output_queue Connection.recv_handshake(self, proto)
for message in queue:
self.message_output(message) # Tell the server the protocol in use, then send any messages that
# were queued while waiting to hear the server's protocol, and stop
# queueing messages.
self.output_lock.acquire()
try:
self.base_message_output(proto)
for message in self.queued_messages:
self.base_message_output(message)
self.queued_messages = []
self.queue_output = False
finally:
self.output_lock.release()
# Defer the ThreadedAsync work to the manager. # Defer the ThreadedAsync work to the manager.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment