Commit 49bc8e46 authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #72 from zopefoundation/uvloop-server

Use uvloop in the single-threaded server
parents cc26ec38 28914ad2
...@@ -2,8 +2,13 @@ from .._compat import PY3 ...@@ -2,8 +2,13 @@ from .._compat import PY3
if PY3: if PY3:
import asyncio import asyncio
try:
from uvloop import new_event_loop
except ImportError:
from asyncio import new_event_loop
else: else:
import trollius as asyncio import trollius as asyncio
from trollius import new_event_loop
import json import json
import logging import logging
...@@ -223,7 +228,7 @@ class Acceptor(object): ...@@ -223,7 +228,7 @@ class Acceptor(object):
self.storage_server = storage_server self.storage_server = storage_server
self.addr = addr self.addr = addr
self.ssl_context = ssl self.ssl_context = ssl
self.event_loop = loop = asyncio.new_event_loop() self.event_loop = loop = new_event_loop()
if isinstance(addr, tuple): if isinstance(addr, tuple):
cr = loop.create_server(self.factory, addr[0], addr[1], cr = loop.create_server(self.factory, addr[0], addr[1],
......
...@@ -10,7 +10,7 @@ ZEO includes a script that provides a nagios monitor plugin: ...@@ -10,7 +10,7 @@ ZEO includes a script that provides a nagios monitor plugin:
In it's simplest form, the script just checks if it can get status: In it's simplest form, the script just checks if it can get status:
>>> import ZEO >>> import ZEO
>>> addr, stop = ZEO.server('test.fs') >>> addr, stop = ZEO.server('test.fs', threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port >>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr]) >>> nagios([saddr])
...@@ -39,7 +39,7 @@ The monitor will optionally output server metric data. There are 2 ...@@ -39,7 +39,7 @@ The monitor will optionally output server metric data. There are 2
kinds of metrics it can output, level and rate metric. If we use the kinds of metrics it can output, level and rate metric. If we use the
-m/--output-metrics option, we'll just get rate metrics: -m/--output-metrics option, we'll just get rate metrics:
>>> addr, stop = ZEO.server('test.fs') >>> addr, stop = ZEO.server('test.fs', threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port >>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m']) >>> nagios([saddr, '-m'])
OK|active_txns=0 OK|active_txns=0
...@@ -115,7 +115,7 @@ profixes metrics with a storage id. ...@@ -115,7 +115,7 @@ profixes metrics with a storage id.
... </mappingstorage> ... </mappingstorage>
... <mappingstorage second> ... <mappingstorage second>
... </mappingstorage> ... </mappingstorage>
... """) ... """, threaded=False)
>>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port >>> saddr = ':'.join(map(str, addr)) # (host, port) -> host:port
>>> nagios([saddr, '-m', '-sstatus']) >>> nagios([saddr, '-m', '-sstatus'])
Empty storage u'first'|first:active_txns=0 Empty storage u'first'|first:active_txns=0
......
...@@ -71,7 +71,7 @@ is generated before the cache is dropped or the message is logged. ...@@ -71,7 +71,7 @@ is generated before the cache is dropped or the message is logged.
Now, we'll restart the server on the original address: Now, we'll restart the server on the original address:
>>> _, admin = start_server(zeo_conf=dict(invalidation_queue_size=1), >>> _, admin = start_server(zeo_conf=dict(invalidation_queue_size=1),
... addr=addr, keep=1, threaded=True) ... addr=addr, keep=1)
>>> wait_connected(db.storage) >>> wait_connected(db.storage)
......
...@@ -4,7 +4,7 @@ We'll start by setting up a server and connecting to it: ...@@ -4,7 +4,7 @@ We'll start by setting up a server and connecting to it:
>>> import ZEO, transaction >>> import ZEO, transaction
>>> addr, stop = ZEO.server(path='test.fs') >>> addr, stop = ZEO.server(path='test.fs', threaded=False)
>>> conn = ZEO.connection(addr) >>> conn = ZEO.connection(addr)
>>> client = conn.db().storage >>> client = conn.db().storage
>>> client.is_connected() >>> client.is_connected()
...@@ -24,7 +24,7 @@ And wait for the connectin to notice it's disconnected: ...@@ -24,7 +24,7 @@ And wait for the connectin to notice it's disconnected:
Now, we'll restart the server: Now, we'll restart the server:
>>> addr, stop = ZEO.server(path='test.fs') >>> addr, stop = ZEO.server(path='test.fs', threaded=False)
Update with another client: Update with another client:
......
...@@ -19,6 +19,7 @@ from zope.testing import setupstack ...@@ -19,6 +19,7 @@ from zope.testing import setupstack
from ZODB.config import storageFromString from ZODB.config import storageFromString
from .forker import start_zeo_server from .forker import start_zeo_server
from .threaded import threaded_server_tests
class ZEOConfigTestBase(setupstack.TestCase): class ZEOConfigTestBase(setupstack.TestCase):
...@@ -120,4 +121,6 @@ class ZEOConfigTest(ZEOConfigTestBase): ...@@ -120,4 +121,6 @@ class ZEOConfigTest(ZEOConfigTestBase):
blob_cache_size_check=50) blob_cache_size_check=50)
def test_suite(): def test_suite():
return unittest.makeSuite(ZEOConfigTest) suite = unittest.makeSuite(ZEOConfigTest)
suite.layer = threaded_server_tests
return suite
...@@ -1609,15 +1609,6 @@ def test_suite(): ...@@ -1609,15 +1609,6 @@ def test_suite():
globs={'print_function': print_function}, globs={'print_function': print_function},
), ),
) )
if not forker.ZEO4_SERVER:
zeo.addTest(
doctest.DocFileSuite(
'dynamic_server_ports.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
),
)
zeo.addTest(PackableStorage.IExternalGC_suite( zeo.addTest(PackableStorage.IExternalGC_suite(
lambda : lambda :
ServerManagingClientStorageForIExternalGCTest( ServerManagingClientStorageForIExternalGCTest(
...@@ -1655,6 +1646,17 @@ def test_suite(): ...@@ -1655,6 +1646,17 @@ def test_suite():
suite.addTest(ZODB.tests.testblob.storage_reusable_suite( suite.addTest(ZODB.tests.testblob.storage_reusable_suite(
'ClientStorageSharedBlobs', create_storage_shared)) 'ClientStorageSharedBlobs', create_storage_shared))
if not forker.ZEO4_SERVER:
from .threaded import threaded_server_tests
dynamic_server_ports_suite = doctest.DocFileSuite(
'dynamic_server_ports.test',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
checker=renormalizing.RENormalizing(patterns),
globs={'print_function': print_function},
)
dynamic_server_ports_suite.layer = threaded_server_tests
suite.addTest(dynamic_server_ports_suite)
return suite return suite
......
...@@ -497,7 +497,7 @@ def test_prefetch(self): ...@@ -497,7 +497,7 @@ def test_prefetch(self):
>>> count = 999 >>> count = 999
>>> import ZEO >>> import ZEO
>>> addr, stop = ZEO.server() >>> addr, stop = ZEO.server(threaded=False)
>>> conn = ZEO.connection(addr) >>> conn = ZEO.connection(addr)
>>> root = conn.root() >>> root = conn.root()
>>> cls = root.__class__ >>> cls = root.__class__
......
...@@ -9,6 +9,7 @@ import unittest ...@@ -9,6 +9,7 @@ import unittest
import ZEO.StorageServer import ZEO.StorageServer
from . import forker from . import forker
from .threaded import threaded_server_tests
@unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL") @unittest.skipIf(forker.ZEO4_SERVER, "ZEO4 servers don't support SSL")
class ClientAuthTests(setupstack.TestCase): class ClientAuthTests(setupstack.TestCase):
...@@ -54,5 +55,7 @@ class ClientAuthTests(setupstack.TestCase): ...@@ -54,5 +55,7 @@ class ClientAuthTests(setupstack.TestCase):
stop() stop()
def test_suite(): def test_suite():
return unittest.makeSuite(ClientAuthTests) suite = unittest.makeSuite(ClientAuthTests)
suite.layer = threaded_server_tests
return suite
...@@ -111,7 +111,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase): ...@@ -111,7 +111,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
def test_client_side(self): def test_client_side(self):
# First, traditional: # First, traditional:
addr, stop = ZEO.server('data.fs') addr, stop = ZEO.server('data.fs', threaded=False)
db = ZEO.DB(addr) db = ZEO.DB(addr)
with db.transaction() as conn: with db.transaction() as conn:
conn.root.l = Length(0) conn.root.l = Length(0)
...@@ -130,6 +130,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase): ...@@ -130,6 +130,7 @@ class ClientSideConflictResolutionTests(zope.testing.setupstack.TestCase):
addr2, stop = ZEO.server( addr2, stop = ZEO.server(
storage_conf='<mappingstorage>\n</mappingstorage>\n', storage_conf='<mappingstorage>\n</mappingstorage>\n',
zeo_conf=dict(client_conflict_resolution=True), zeo_conf=dict(client_conflict_resolution=True),
threaded=False,
) )
db = ZEO.DB(addr2) db = ZEO.DB(addr2)
......
...@@ -11,6 +11,7 @@ from .. import runzeo ...@@ -11,6 +11,7 @@ from .. import runzeo
from .testConfig import ZEOConfigTestBase from .testConfig import ZEOConfigTestBase
from . import forker from . import forker
from .threaded import threaded_server_tests
here = os.path.dirname(__file__) here = os.path.dirname(__file__)
server_cert = os.path.join(here, 'server.pem') server_cert = os.path.join(here, 'server.pem')
...@@ -121,6 +122,7 @@ class SSLConfigTest(ZEOConfigTestBase): ...@@ -121,6 +122,7 @@ class SSLConfigTest(ZEOConfigTestBase):
@mock.patch(('asyncio' if PY3 else 'trollius') + '.set_event_loop') @mock.patch(('asyncio' if PY3 else 'trollius') + '.set_event_loop')
@mock.patch(('asyncio' if PY3 else 'trollius') + '.new_event_loop') @mock.patch(('asyncio' if PY3 else 'trollius') + '.new_event_loop')
@mock.patch('ZEO.asyncio.client.new_event_loop') @mock.patch('ZEO.asyncio.client.new_event_loop')
@mock.patch('ZEO.asyncio.server.new_event_loop')
class SSLConfigTestMockiavellian(ZEOConfigTestBase): class SSLConfigTestMockiavellian(ZEOConfigTestBase):
@mock.patch('ssl.create_default_context') @mock.patch('ssl.create_default_context')
...@@ -335,10 +337,12 @@ pwfunc = lambda : '1234' ...@@ -335,10 +337,12 @@ pwfunc = lambda : '1234'
def test_suite(): def test_suite():
return unittest.TestSuite(( suite = unittest.TestSuite((
unittest.makeSuite(SSLConfigTest), unittest.makeSuite(SSLConfigTest),
unittest.makeSuite(SSLConfigTestMockiavellian), unittest.makeSuite(SSLConfigTestMockiavellian),
)) ))
suite.layer = threaded_server_tests
return suite
# Helpers for other tests: # Helpers for other tests:
......
"""Test layer for threaded-server tests
uvloop currently has a bug,
https://github.com/MagicStack/uvloop/issues/39, that causes failure if
multiprocessing and threaded servers are mixed in the same
application, so we isolate the few threaded tests in their own layer.
"""
import ZODB.tests.util
threaded_server_tests = ZODB.tests.util.MininalTestLayer(
'threaded_server_tests')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment