Commit 4895e57d authored by Tres Seaver's avatar Tres Seaver

Remove guard against Python 2.5.

parent 4a3ffb9e
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
############################################################################## ##############################################################################
"""Test suite for ZEO based on ZODB.tests.""" """Test suite for ZEO based on ZODB.tests."""
from __future__ import print_function from __future__ import print_function
import multiprocessing
import re import re
from ZEO.ClientStorage import ClientStorage from ZEO.ClientStorage import ClientStorage
...@@ -1562,48 +1563,42 @@ if sys.platform.startswith('win'): ...@@ -1562,48 +1563,42 @@ if sys.platform.startswith('win'):
del runzeo_logrotate_on_sigusr2 del runzeo_logrotate_on_sigusr2
del unix_domain_sockets del unix_domain_sockets
if sys.version_info >= (2, 6): def work_with_multiprocessing_process(name, addr, q):
import multiprocessing conn = ZEO.connection(addr)
q.put((name, conn.root.x))
conn.close()
def work_with_multiprocessing_process(name, addr, q): class MultiprocessingTests(unittest.TestCase):
layer = ZODB.tests.util.MininalTestLayer('work_with_multiprocessing')
def test_work_with_multiprocessing(self):
"Client storage should work with multi-processing."
# Gaaa, zope.testing.runner.FakeInputContinueGenerator has no close
if not hasattr(sys.stdin, 'close'):
sys.stdin.close = lambda : None
if not hasattr(sys.stdin, 'fileno'):
sys.stdin.fileno = lambda : -1
self.globs = {}
forker.setUp(self)
addr, adminaddr = self.globs['start_server']()
conn = ZEO.connection(addr) conn = ZEO.connection(addr)
q.put((name, conn.root.x)) conn.root.x = 1
transaction.commit()
q = multiprocessing.Queue()
processes = [multiprocessing.Process(
target=work_with_multiprocessing_process,
args=(i, addr, q))
for i in range(3)]
_ = [p.start() for p in processes]
self.assertEqual(sorted(q.get(timeout=300) for p in processes),
[(0, 1), (1, 1), (2, 1)])
_ = [p.join(30) for p in processes]
conn.close() conn.close()
zope.testing.setupstack.tearDown(self)
class MultiprocessingTests(unittest.TestCase):
layer = ZODB.tests.util.MininalTestLayer('work_with_multiprocessing')
def test_work_with_multiprocessing(self):
"Client storage should work with multi-processing."
# Gaaa, zope.testing.runner.FakeInputContinueGenerator has no close
if not hasattr(sys.stdin, 'close'):
sys.stdin.close = lambda : None
if not hasattr(sys.stdin, 'fileno'):
sys.stdin.fileno = lambda : -1
self.globs = {}
forker.setUp(self)
addr, adminaddr = self.globs['start_server']()
conn = ZEO.connection(addr)
conn.root.x = 1
transaction.commit()
q = multiprocessing.Queue()
processes = [multiprocessing.Process(
target=work_with_multiprocessing_process,
args=(i, addr, q))
for i in range(3)]
_ = [p.start() for p in processes]
self.assertEqual(sorted(q.get(timeout=300) for p in processes),
[(0, 1), (1, 1), (2, 1)])
_ = [p.join(30) for p in processes]
conn.close()
zope.testing.setupstack.tearDown(self)
else:
class MultiprocessingTests(unittest.TestCase):
pass
def quick_close_doesnt_kill_server(): def quick_close_doesnt_kill_server():
r""" r"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment