Commit 0eceaf66 authored by Jim Fulton's avatar Jim Fulton

Fixed new_addr()

When changing addresses (think ZooKeeper :) ), we needed to be more
careful about current connection status. If we're still trying to
connect, we should stop and start over with the new addresses.
parent 8c25d2bf
......@@ -282,7 +282,18 @@ class ClientStorage(object):
def new_addr(self, addr):
self._addr = addr
self._server.new_addrs(addr)
self._server.new_addrs(self._normalize_addr(addr))
def _normalize_addr(self, addr):
if isinstance(addr, int):
addr = ('127.0.0.1', addr)
if isinstance(addr, str):
addr = [addr]
elif (isinstance(addr, tuple) and len(addr) == 2 and
isinstance(addr[0], str) and isinstance(addr[1], int)):
addr = [addr]
return addr
def close(self):
"Storage API: finalize the storage, releasing external resources."
......
......@@ -105,7 +105,7 @@ class Protocol(asyncio.Protocol):
@cr.add_done_callback
def done_connecting(future):
if future.exception() is not None:
logger.info("Connection to %rfailed, retrying, %s",
logger.info("Connection to %r failed, retrying, %s",
self.addr, future.exception())
# keep trying
if not self.closed:
......@@ -384,6 +384,20 @@ class Client:
self.protocols = ()
self.disconnected(None)
def new_addrs(self, addrs):
self.addrs = addrs
if self.trying_to_connect():
self.disconnected(None)
def trying_to_connect(self):
"""Return whether we're trying to connect
Either because we're disconnected, or because we're connected
read-only, but want a writable connection if we can get one.
"""
return (not self.ready or
self.is_read_only() and self.read_only is Fallback)
closed = False
def close(self):
if not self.closed:
......@@ -443,7 +457,7 @@ class Client:
protocol.close() # too late, we went home with another
def register_failed(self, protocol, exc):
# A protcol failed registration. That's weird. If they've all
# A protocol failed registration. That's weird. If they've all
# failed, we should try again in a bit.
protocol.close()
logger.exception("Registration or cache validation failed, %s", exc)
......@@ -755,10 +769,16 @@ class ClientRunner:
self.__call = call_closed
def new_addr(self, addrs):
def apply_threadsafe(self, future, func, *args):
try:
future.set_result(func(*args))
except Exception as exc:
future.set_exception(exc)
def new_addrs(self, addrs):
# This usually doesn't have an immediate effect, since the
# addrs aren't used until the client disconnects.xs
self.client.addrs = addrs
self.__call(self.apply_threadsafe, self.client.new_addrs, addrs)
def wait(self, timeout=None):
if timeout is None:
......
......@@ -29,6 +29,8 @@ Now, we'll restart the server and update the connection:
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
>>> server.start_thread()
>>> client.new_addr(server.addr)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment