Commit 31d41127 authored by Jim Fulton's avatar Jim Fulton

Refactored the asyncio.Server-based server to be more careful about closing down

It appeared that some tests were failing because servers weren't
closing down.  Modified the test forker module and the server to be
more paranoid about closing the server.

This seems to have helped test stability. (Or maybe I accidentally
fixed something while flailing :)).
parent fddb6f85
...@@ -212,7 +212,8 @@ class Acceptor: ...@@ -212,7 +212,8 @@ class Acceptor:
self.event_loop = loop = asyncio.new_event_loop() self.event_loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
if isinstance(addr, tuple): if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1], ssl=ssl) cr = loop.create_server(self.factory, addr[0], addr[1],
reuse_address=True, ssl=ssl)
else: else:
cr = loop.create_unix_server(self.factory, addr, ssl=ssl) cr = loop.create_unix_server(self.factory, addr, ssl=ssl)
...@@ -245,12 +246,28 @@ class Acceptor: ...@@ -245,12 +246,28 @@ class Acceptor:
def loop(self, timeout=None): def loop(self, timeout=None):
self.event_loop.run_forever() self.event_loop.run_forever()
self.event_loop.run_until_complete(self.server.wait_closed())
self.event_loop.close() self.event_loop.close()
__closed = False closed = False
def close(self): def close(self):
if not self.__closed: if not self.closed:
self.__closed = True self.closed = True
self.server.close() self.event_loop.call_soon_threadsafe(self._close)
self.event_loop.call_soon_threadsafe(self.event_loop.stop)
def _close(self):
loop = self.event_loop
self.server.close()
f = asyncio.async(self.server.wait_closed(), loop=loop)
@f.add_done_callback
def server_closed(f):
# stop the loop when the server closes:
loop.call_soon(loop.stop)
def timeout():
logger.warning("Timed out closing asyncio.Server")
loop.call_soon(loop.stop)
# But if the server doesn't close in a second, stop the loop anyway.
loop.call_later(1, timeout)
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
############################################################################## ##############################################################################
"""Library for forking storage server and connecting client storage""" """Library for forking storage server and connecting client storage"""
from __future__ import print_function from __future__ import print_function
import gc
import os import os
import random import random
import sys import sys
...@@ -83,7 +84,7 @@ def encode_format(fmt): ...@@ -83,7 +84,7 @@ def encode_format(fmt):
return fmt return fmt
def runner(config, qin, qout, timeout=None, def runner(config, qin, qout, timeout=None,
join_timeout=9, debug=False, name=None, debug=False, name=None,
keep=False, protocol=None): keep=False, protocol=None):
if debug: if debug:
...@@ -120,11 +121,7 @@ def runner(config, qin, qout, timeout=None, ...@@ -120,11 +121,7 @@ def runner(config, qin, qout, timeout=None,
except Empty: except Empty:
pass pass
server.server.close() server.server.close()
thread.join(join_timeout) thread.join(3)
if thread.is_alive():
logger.warning("server thread didn't stop")
else:
logger.debug('server thread stopped')
if not keep: if not keep:
# Try to cleanup storage files # Try to cleanup storage files
...@@ -134,10 +131,11 @@ def runner(config, qin, qout, timeout=None, ...@@ -134,10 +131,11 @@ def runner(config, qin, qout, timeout=None,
except AttributeError: except AttributeError:
pass pass
qout.put('stopped') qout.put(thread.is_alive())
qin.get(timeout=11) # ack
if hasattr(qout, 'close'): if hasattr(qout, 'close'):
qout.close() qout.close()
qout.join_thread() qout.cancel_join_thread()
except Exception: except Exception:
logger.exception("In server thread") logger.exception("In server thread")
...@@ -149,12 +147,25 @@ def runner(config, qin, qout, timeout=None, ...@@ -149,12 +147,25 @@ def runner(config, qin, qout, timeout=None,
def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None): def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
qin.put('stop') qin.put('stop')
if hasattr(qin, 'close'): dirty = qout.get(timeout=stop_timeout)
qin.close() qin.put('ack')
qin.join_thread() if dirty:
qout.get(timeout=stop_timeout) print("WARNING SERVER DIDN'T STOP CLEANLY", file=sys.stderr)
# The runner thread didn't stop. If it was a process,
# give it some time to exit
if hasattr(thread, 'pid') and thread.pid:
os.waitpid(thread.pid)
else:
# Gaaaa, force gc in hopes of maybe getting the unclosed
# sockets to get GCed
gc.collect()
thread.join(stop_timeout) thread.join(stop_timeout)
os.remove(config) os.remove(config)
if hasattr(qin, 'close'):
qin.close()
qin.cancel_join_thread()
def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
path='Data.fs', protocol=None, blob_dir=None, path='Data.fs', protocol=None, blob_dir=None,
...@@ -170,6 +181,8 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -170,6 +181,8 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
to the config file. to the config file.
""" """
import logging; logging.basicConfig(level='DEBUG')
if not storage_conf: if not storage_conf:
storage_conf = '<filestorage>\npath %s\n</filestorage>' % path storage_conf = '<filestorage>\npath %s\n</filestorage>' % path
...@@ -217,7 +230,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -217,7 +230,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
thread.start() thread.start()
addr = qout.get(timeout=start_timeout) addr = qout.get(timeout=start_timeout)
def stop(stop_timeout=9): def stop(stop_timeout=99):
stop_runner(thread, tmpfile, qin, qout, stop_timeout) stop_runner(thread, tmpfile, qin, qout, stop_timeout)
return addr, stop return addr, stop
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment