Commit 7064bb21 authored by Jim Fulton's avatar Jim Fulton

- Storage servers now emit Serving and Closed events so subscribers

  can discover addresses when dynamic port assignment (bind to port 0)
  is used. This could, for example, be used to update address
  information in a ZooKeeper database.

- Client storagers have a method, new_addr, that can be used to change
  the server address(es). This can be used, for example, to update a
  dynamically determined server address from information in a
  ZooKeeper database.

- Moved some responsibility from runzeo to StorageServer to make it
  easier to use storage servers without runzeo.
parent 140d42b4
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
Change History Change History
================ ================
3.11.0 (2010-??-??) 3.11.0a1 (2011-??-??)
=================== =====================
New Features New Features
------------ ------------
...@@ -19,6 +19,16 @@ New Features ...@@ -19,6 +19,16 @@ New Features
comparison inherited from object. (This doesn't apply to old-style comparison inherited from object. (This doesn't apply to old-style
class instances.) class instances.)
- Storage servers now emit Serving and Closed events so subscribers
can discover addresses when dynamic port assignment (bind to port 0)
is used. This could, for example, be used to update address
information in a ZooKeeper database.
- Client storagers have a method, new_addr, that can be used to change
the server address(es). This can be used, for example, to update a
dynamically determined server address from information in a
ZooKeeper database.
3.10.5 (2011-11-19) 3.10.5 (2011-11-19)
=================== ===================
......
...@@ -424,7 +424,9 @@ class ClientStorage(object): ...@@ -424,7 +424,9 @@ class ClientStorage(object):
if not self._rpc_mgr.attempt_connect(): if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect() self._rpc_mgr.connect()
def new_addr(self, addr):
self._addr = addr
self._rpc_mgr.new_addrs(addr)
def _wait(self, timeout=None): def _wait(self, timeout=None):
if timeout is not None: if timeout is not None:
...@@ -503,10 +505,15 @@ class ClientStorage(object): ...@@ -503,10 +505,15 @@ class ClientStorage(object):
""" """
self._db = db self._db = db
def is_connected(self): def is_connected(self, test=False):
"""Return whether the storage is currently connected to a server.""" """Return whether the storage is currently connected to a server."""
# This function is used by clients, so we only report that a # This function is used by clients, so we only report that a
# connection exists when the connection is ready to use. # connection exists when the connection is ready to use.
if test:
try:
self._server.lastTransaction()
except Exception:
pass
return self._ready.isSet() return self._ready.isSet()
def sync(self): def sync(self):
......
...@@ -46,6 +46,7 @@ import transaction ...@@ -46,6 +46,7 @@ import transaction
import warnings import warnings
import ZEO.zrpc.error import ZEO.zrpc.error
import ZODB.blob import ZODB.blob
import ZODB.event
import ZODB.serialize import ZODB.serialize
import ZODB.TimeStamp import ZODB.TimeStamp
import zope.interface import zope.interface
...@@ -782,18 +783,20 @@ class StorageServer: ...@@ -782,18 +783,20 @@ class StorageServer:
# Classes we instantiate. A subclass might override. # Classes we instantiate. A subclass might override.
DispatcherClass = Dispatcher DispatcherClass = ZEO.zrpc.server.Dispatcher
ZEOStorageClass = ZEOStorage ZEOStorageClass = ZEOStorage
ManagedServerConnectionClass = ManagedServerConnection ManagedServerConnectionClass = ManagedServerConnection
def __init__(self, addr, storages, read_only=0, def __init__(self, addr, storages,
read_only=0,
invalidation_queue_size=100, invalidation_queue_size=100,
invalidation_age=None, invalidation_age=None,
transaction_timeout=None, transaction_timeout=None,
monitor_address=None, monitor_address=None,
auth_protocol=None, auth_protocol=None,
auth_database=None, auth_database=None,
auth_realm=None): auth_realm=None,
):
"""StorageServer constructor. """StorageServer constructor.
This is typically invoked from the start.py script. This is typically invoked from the start.py script.
...@@ -891,8 +894,13 @@ class StorageServer: ...@@ -891,8 +894,13 @@ class StorageServer:
storage.registerDB(StorageServerDB(self, name)) storage.registerDB(StorageServerDB(self, name))
self.invalidation_age = invalidation_age self.invalidation_age = invalidation_age
self.connections = {} self.connections = {}
self.dispatcher = self.DispatcherClass(addr, self.socket_map = {}
factory=self.new_connection) self.dispatcher = self.DispatcherClass(
addr, factory=self.new_connection, map=self.socket_map)
if len(self.addr) == 2 and self.addr[1] == 0 and self.addr[0]:
self.addr = self.dispatcher.socket.getsockname()
ZODB.event.notify(
Serving(self, address=self.dispatcher.socket.getsockname()))
self.stats = {} self.stats = {}
self.timeouts = {} self.timeouts = {}
for name in self.storages.keys(): for name in self.storages.keys():
...@@ -1137,26 +1145,53 @@ class StorageServer: ...@@ -1137,26 +1145,53 @@ class StorageServer:
return latest_tid, list(oids) return latest_tid, list(oids)
def close_server(self): def loop(self):
try:
asyncore.loop(map=self.socket_map)
except Exception:
if not self.__closed:
raise # Unexpected exc
__thread = None
def start_thread(self, daemon=True):
self.__thread = thread = threading.Thread(target=self.loop)
thread.setDaemon(daemon)
thread.start()
__closed = False
def close(self, join_timeout=1):
"""Close the dispatcher so that there are no new connections. """Close the dispatcher so that there are no new connections.
This is only called from the test suite, AFAICT. This is only called from the test suite, AFAICT.
""" """
if self.__closed:
return
self.__closed = True
# Stop accepting connections
self.dispatcher.close() self.dispatcher.close()
if self.monitor is not None: if self.monitor is not None:
self.monitor.close() self.monitor.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is ZODB.event.notify(Closed(self))
# empty.
for s in asyncore.socket_map.values(): # Close open client connections
for sid, connections in self.connections.items():
for conn in connections[:]:
try: try:
s.close() conn.connection.close()
except: except:
pass pass
asyncore.socket_map.clear()
for storage in self.storages.values(): for name, storage in self.storages.iteritems():
logger.info("closing storage %r", name)
storage.close() storage.close()
if self.__thread is not None:
self.__thread.join(join_timeout)
close_server = close
def close_conn(self, conn): def close_conn(self, conn):
"""Internal: remove the given connection from self.connections. """Internal: remove the given connection from self.connections.
...@@ -1570,3 +1605,16 @@ class CommitLog: ...@@ -1570,3 +1605,16 @@ class CommitLog:
if self.file: if self.file:
self.file.close() self.file.close()
self.file = None self.file = None
class ServerEvent:
def __init__(self, server, **kw):
self.__dict__.update(kw)
self.server = server
class Serving(ServerEvent):
pass
class Closed(ServerEvent):
pass
...@@ -160,7 +160,7 @@ class ZEOServer: ...@@ -160,7 +160,7 @@ class ZEOServer:
self.create_server() self.create_server()
self.loop_forever() self.loop_forever()
finally: finally:
self.close_storages() self.server.close_server()
self.clear_socket() self.clear_socket()
self.remove_pidfile() self.remove_pidfile()
...@@ -178,6 +178,10 @@ class ZEOServer: ...@@ -178,6 +178,10 @@ class ZEOServer:
root.addHandler(handler) root.addHandler(handler)
def check_socket(self): def check_socket(self):
if (isinstance(self.options.address, tuple) and
self.options.address[1] is None):
self.options.address = self.options.address[0], 0
return
if self.can_connect(self.options.family, self.options.address): if self.can_connect(self.options.family, self.options.address):
self.options.usage("address %s already in use" % self.options.usage("address %s already in use" %
repr(self.options.address)) repr(self.options.address))
...@@ -254,7 +258,7 @@ class ZEOServer: ...@@ -254,7 +258,7 @@ class ZEOServer:
if self.options.testing_exit_immediately: if self.options.testing_exit_immediately:
print "testing exit immediately" print "testing exit immediately"
else: else:
asyncore.loop() self.server.loop()
def handle_sigterm(self): def handle_sigterm(self):
log("terminated by SIGTERM") log("terminated by SIGTERM")
...@@ -289,20 +293,6 @@ class ZEOServer: ...@@ -289,20 +293,6 @@ class ZEOServer:
handler.rotate() handler.rotate()
log("Log files rotation complete", level=logging.INFO) log("Log files rotation complete", level=logging.INFO)
def close_storages(self):
for name, storage in self.storages.items():
log("closing storage %r" % name)
try:
storage.close()
except: # Keep going
log("failed to close storage %r" % name,
level=logging.ERROR, exc_info=True)
def _get_pidfile(self): def _get_pidfile(self):
pidfile = self.options.pid_file pidfile = self.options.pid_file
# 'pidfile' is marked as not required. # 'pidfile' is marked as not required.
......
The storage server can be told to bind to port 0, allowing the OS to
pick a port dynamically. For this to be useful, there needs to be a
way to tell someone. For this reason, the server posts events to
ZODB.notify.
>>> import ZODB.event
>>> old_notify = ZODB.event.notify
>>> last_event = None
>>> def notify(event):
... global last_event
... last_event = event
>>> ZODB.event.notify = notify
Now, let's start a server and verify that we get a serving event:
>>> import ZEO.StorageServer, ZODB.MappingStorage
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.MappingStorage.MappingStorage()})
>>> isinstance(last_event, ZEO.StorageServer.Serving)
True
>>> last_event.server is server
True
>>> last_event.address[0], last_event.address[1] > 0
('127.0.0.1', True)
If the host part pf the address passed to the constructor is not an
empty string. then the server addr attribute is the same as the
address attribute of the event:
>>> server.addr == last_event.address
True
Let's run the server in a thread, to make sure we can connect.
>>> server.start_thread()
>>> client = ZEO.client(last_event.address)
>>> client.is_connected()
True
If we close the server, we'll get a closed event:
>>> server.close()
>>> isinstance(last_event, ZEO.StorageServer.Closed)
True
>>> last_event.server is server
True
>>> wait_until(lambda : not client.is_connected(test=True))
>>> client.close()
If we pass an empty string as the host part of the server address, we
can't really assign a single address, so the server addr attribute is
left alone:
>>> server = ZEO.StorageServer.StorageServer(
... ('', 0), {'1': ZODB.MappingStorage.MappingStorage()})
>>> isinstance(last_event, ZEO.StorageServer.Serving)
True
>>> last_event.server is server
True
>>> last_event.address[1] > 0
True
If the host part pf the address passed to the constructor is not an
empty string. then the server addr attribute is the same as the
address attribute of the event:
>>> server.addr
('', 0)
>>> server.close()
The runzeo module provides some process support, including getting the
server configuration via a ZConfig configuration file. To spell a
dynamic port using ZConfig, you'd use a hostname by itself. In this
case, ZConfig passes None as the port.
>>> import ZEO.runzeo
>>> open('conf', 'w').write("""
... <zeo>
... address 127.0.0.1
... </zeo>
... <mappingstorage>
... </mappingstorage>
... """)
>>> options = ZEO.runzeo.ZEOOptions()
>>> options.realize('-C conf'.split())
>>> options.address
('127.0.0.1', None)
>>> rs = ZEO.runzeo.ZEOServer(options)
>>> rs.check_socket()
>>> options.address
('127.0.0.1', 0)
.. cleanup
>>> ZODB.event.notify = old_notify
You can change the address(es) of a client storaage.
We'll start by setting up a server and connecting to it:
>>> import ZEO, ZEO.StorageServer, ZODB.FileStorage, transaction
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
>>> server.start_thread()
>>> conn = ZEO.connection(server.addr)
>>> client = conn.db().storage
>>> client.is_connected()
True
>>> conn.root()
{}
>>> conn.root.x = 1
>>> transaction.commit()
Now we'll close the server:
>>> server.close()
And wait for the connectin to notice it's disconnected:
>>> wait_until(lambda : not client.is_connected())
Now, we'll restart the server and update the connection:
>>> server = ZEO.StorageServer.StorageServer(
... ('127.0.0.1', 0), {'1': ZODB.FileStorage.FileStorage('t.fs')})
>>> server.start_thread()
>>> client.new_addr(server.addr)
Update with another client:
>>> conn2 = ZEO.connection(server.addr)
>>> conn2.root.x += 1
>>> transaction.commit()
Wait for connect:
>>> wait_until(lambda : client.is_connected())
>>> _ = transaction.begin()
>>> conn.root()
{'x': 2}
.. cleanup
>>> conn.close()
>>> conn2.close()
>>> server.close()
...@@ -28,7 +28,10 @@ subclass it and give it a do-nothing dispatcher "class": ...@@ -28,7 +28,10 @@ subclass it and give it a do-nothing dispatcher "class":
>>> import ZEO.StorageServer >>> import ZEO.StorageServer
>>> class StorageServer(ZEO.StorageServer.StorageServer): >>> class StorageServer(ZEO.StorageServer.StorageServer):
... DispatcherClass = lambda *a, **k: None ... class DispatcherClass:
... __init__ = lambda *a, **kw: None
... class socket:
... getsockname = staticmethod(lambda : 'socket')
We'll create a storage instance and a storage server using it: We'll create a storage instance and a storage server using it:
......
...@@ -40,8 +40,11 @@ class StorageServer(ZEO.StorageServer.StorageServer): ...@@ -40,8 +40,11 @@ class StorageServer(ZEO.StorageServer.StorageServer):
storages = {'1': ZODB.MappingStorage.MappingStorage()} storages = {'1': ZODB.MappingStorage.MappingStorage()}
ZEO.StorageServer.StorageServer.__init__(self, addr, storages, **kw) ZEO.StorageServer.StorageServer.__init__(self, addr, storages, **kw)
def DispatcherClass(*args, **kw):
pass class DispatcherClass:
__init__ = lambda *a, **kw: None
class socket:
getsockname = staticmethod(lambda : 'socket')
class Connection: class Connection:
......
...@@ -898,7 +898,7 @@ def multiple_storages_invalidation_queue_is_not_insane(): ...@@ -898,7 +898,7 @@ def multiple_storages_invalidation_queue_is_not_insane():
>>> sorted([int(u64(oid)) for oid in oids]) >>> sorted([int(u64(oid)) for oid in oids])
[10, 11, 12, 13, 14] [10, 11, 12, 13, 14]
>>> server.close_server() >>> server.close()
""" """
def getInvalidationsAfterServerRestart(): def getInvalidationsAfterServerRestart():
...@@ -962,7 +962,7 @@ need to be invalidated. This means we'll invalidate objects that ...@@ -962,7 +962,7 @@ need to be invalidated. This means we'll invalidate objects that
dont' need to be invalidated, however, that's better than verifying dont' need to be invalidated, however, that's better than verifying
caches.) caches.)
>>> sv.close_server() >>> sv.close()
>>> fs.close() >>> fs.close()
If a storage doesn't implement lastInvalidations, a client can still If a storage doesn't implement lastInvalidations, a client can still
...@@ -1249,7 +1249,7 @@ def runzeo_without_configfile(): ...@@ -1249,7 +1249,7 @@ def runzeo_without_configfile():
------ ------
--T INFO ZEO.zrpc () listening on ... --T INFO ZEO.zrpc () listening on ...
------ ------
--T INFO ZEO.runzeo () closing storage '1' --T INFO ZEO.StorageServer closing storage '1'
testing exit immediately testing exit immediately
""" """
...@@ -1761,6 +1761,7 @@ def test_suite(): ...@@ -1761,6 +1761,7 @@ def test_suite():
'zeo-fan-out.test', 'zdoptions.test', 'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test', 'drop_cache_rather_than_verify.txt', 'client-config.test',
'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt', 'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
'dynamic_server_ports.test', 'new_addr.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown, setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
), ),
) )
......
...@@ -94,7 +94,7 @@ class ZEOTestServer(asyncore.dispatcher): ...@@ -94,7 +94,7 @@ class ZEOTestServer(asyncore.dispatcher):
# the ack character until the storage is finished closing. # the ack character until the storage is finished closing.
if self._count <= 0: if self._count <= 0:
self.log('closing the storage') self.log('closing the storage')
self._server.close_server() self._server.close()
if not self._keep: if not self._keep:
for storage in self._server.storages.values(): for storage in self._server.storages.values():
cleanup(storage) cleanup(storage)
...@@ -206,6 +206,7 @@ def main(): ...@@ -206,6 +206,7 @@ def main():
d.start() d.start()
# Loop for socket events # Loop for socket events
log(label, 'entering asyncore loop') log(label, 'entering asyncore loop')
server.start_thread()
asyncore.loop() asyncore.loop()
......
...@@ -146,6 +146,9 @@ class ConnectionManager(object): ...@@ -146,6 +146,9 @@ class ConnectionManager(object):
# attempting to connect. # attempting to connect.
self.thread = None # Protected by self.cond self.thread = None # Protected by self.cond
def new_addrs(self, addrs):
self.addrlist = self._parse_addrs(addrs)
def _start_asyncore_loop(self): def _start_asyncore_loop(self):
self.map = {} self.map = {}
self.trigger = ZEO.zrpc.trigger.trigger(self.map) self.trigger = ZEO.zrpc.trigger.trigger(self.map)
...@@ -269,9 +272,7 @@ class ConnectionManager(object): ...@@ -269,9 +272,7 @@ class ConnectionManager(object):
t = self.thread t = self.thread
if t is None: if t is None:
log("CM.connect(): starting ConnectThread") log("CM.connect(): starting ConnectThread")
self.thread = t = ConnectThread(self, self.client, self.thread = t = ConnectThread(self, self.client)
self.addrlist,
self.tmin, self.tmax)
t.setDaemon(1) t.setDaemon(1)
t.start() t.start()
if sync: if sync:
...@@ -362,13 +363,10 @@ class ConnectThread(threading.Thread): ...@@ -362,13 +363,10 @@ class ConnectThread(threading.Thread):
# We don't expect clients to call any methods of this Thread other # We don't expect clients to call any methods of this Thread other
# than close() and those defined by the Thread API. # than close() and those defined by the Thread API.
def __init__(self, mgr, client, addrlist, tmin, tmax): def __init__(self, mgr, client):
self.__super_init(name="Connect(%s)" % addrlist) self.__super_init(name="Connect(%s)" % mgr.addrlist)
self.mgr = mgr self.mgr = mgr
self.client = client self.client = client
self.addrlist = addrlist
self.tmin = tmin
self.tmax = tmax
self.stopped = 0 self.stopped = 0
self.one_attempt = threading.Event() self.one_attempt = threading.Event()
# A ConnectThread keeps track of whether it has finished a # A ConnectThread keeps track of whether it has finished a
...@@ -380,7 +378,7 @@ class ConnectThread(threading.Thread): ...@@ -380,7 +378,7 @@ class ConnectThread(threading.Thread):
self.stopped = 1 self.stopped = 1
def run(self): def run(self):
delay = self.tmin delay = self.mgr.tmin
success = 0 success = 0
# Don't wait too long the first time. # Don't wait too long the first time.
# TODO: make timeout configurable? # TODO: make timeout configurable?
...@@ -396,11 +394,11 @@ class ConnectThread(threading.Thread): ...@@ -396,11 +394,11 @@ class ConnectThread(threading.Thread):
if self.mgr.is_connected(): if self.mgr.is_connected():
log("CT: still trying to replace fallback connection", log("CT: still trying to replace fallback connection",
level=logging.INFO) level=logging.INFO)
delay = min(delay*2, self.tmax) delay = min(delay*2, self.mgr.tmax)
log("CT: exiting thread: %s" % self.getName()) log("CT: exiting thread: %s" % self.getName())
def try_connecting(self, timeout): def try_connecting(self, timeout):
"""Try connecting to all self.addrlist addresses. """Try connecting to all self.mgr.addrlist addresses.
Return 1 if a preferred connection was found; 0 if no Return 1 if a preferred connection was found; 0 if no
connection was found; and -1 if a fallback connection was connection was found; and -1 if a fallback connection was
...@@ -408,7 +406,7 @@ class ConnectThread(threading.Thread): ...@@ -408,7 +406,7 @@ class ConnectThread(threading.Thread):
If no connection is found within timeout seconds, return 0. If no connection is found within timeout seconds, return 0.
""" """
log("CT: attempting to connect on %d sockets" % len(self.addrlist)) log("CT: attempting to connect on %d sockets" % len(self.mgr.addrlist))
deadline = time.time() + timeout deadline = time.time() + timeout
wrappers = self._create_wrappers() wrappers = self._create_wrappers()
for wrap in wrappers.keys(): for wrap in wrappers.keys():
...@@ -434,7 +432,7 @@ class ConnectThread(threading.Thread): ...@@ -434,7 +432,7 @@ class ConnectThread(threading.Thread):
return 0 return 0
def _expand_addrlist(self): def _expand_addrlist(self):
for domain, addr in self.addrlist: for domain, addr in self.mgr.addrlist:
# AF_INET really means either IPv4 or IPv6, possibly # AF_INET really means either IPv4 or IPv6, possibly
# indirected by DNS. By design, DNS lookup is deferred # indirected by DNS. By design, DNS lookup is deferred
# until connections get established, so that DNS # until connections get established, so that DNS
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
# #
############################################################################## ##############################################################################
import asyncore import asyncore
import errno
import sys import sys
import threading import threading
import logging import logging
...@@ -658,7 +659,11 @@ class ManagedServerConnection(Connection): ...@@ -658,7 +659,11 @@ class ManagedServerConnection(Connection):
def server_loop(map): def server_loop(map):
while len(map) > 1: while len(map) > 1:
try:
asyncore.poll(30.0, map) asyncore.poll(30.0, map)
except Exception, v:
if v.args[0] != errno.EBADF:
raise
for o in map.values(): for o in map.values():
o.close() o.close()
......
...@@ -43,8 +43,8 @@ class Dispatcher(asyncore.dispatcher): ...@@ -43,8 +43,8 @@ class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections""" """A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__ __super_init = asyncore.dispatcher.__init__
def __init__(self, addr, factory=Connection): def __init__(self, addr, factory=Connection, map=None):
self.__super_init() self.__super_init(map=map)
self.addr = addr self.addr = addr
self.factory = factory self.factory = factory
self._open_socket() self._open_socket()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment