Commit a4c4010a authored by Jeremy Hylton's avatar Jeremy Hylton

Add rudiments of a ZEO test framework for Windows

winserver.py: A small script that runs a ZEO StorageServer and a ZEOTestServer.
    The ZEOTestServer is used to kill the StorageServer during a unit test's
    teardown phases.  As soon as a client connects to that server, it exits
    the process.

forker.py: Now forks and spawns, depending on os.
    XXX This needs to be cleaned up a lot, since the interfaces on Windows and
    Unix are nothing alike.

testZEO.py:  Add WindowsGenericTests and WindowsZEOFileStorageTests.
    These test cases use the Windows-specific forker interface.  They also
    use getStorageInfo() to describe the storage, rather than getStorage()
    to create the storage.  This is necessary because the storage instance
    is created in a separate process.
parent 3b5501cf
......@@ -4,7 +4,10 @@ import asyncore
import atexit
import os
import profile
import random
import socket
import sys
import threading
import time
import types
import ThreadedAsync
......@@ -12,80 +15,98 @@ import ZEO.ClientStorage, ZEO.StorageServer
PROFILE = 0
class ZEOServerExit(asyncore.file_dispatcher):
"""Used to exit ZEO.StorageServer when run is done"""
if os.name == "nt":
def writable(self):
return 0
def start_zeo_server(storage_name, args):
"""Start a ZEO server in a separate process.
def readable(self):
return 1
Returns the ZEO port, the test server port, and the pid.
"""
import ZEO.tests.winserver
port = random.randrange(20000, 30000)
script = ZEO.tests.winserver.__file__
if script.endswith('.pyc'):
script = script[:-1]
args = (sys.executable, script, str(port), storage_name) + args
pid = os.spawnv(os.P_NOWAIT, sys.executable, args)
return ('localhost', port), ('localhost', port + 1), pid
def handle_read(self):
buf = self.recv(4)
if buf:
assert buf == "done"
else:
class ZEOServerExit(asyncore.file_dispatcher):
"""Used to exit ZEO.StorageServer when run is done"""
def writable(self):
return 0
def readable(self):
return 1
def handle_read(self):
buf = self.recv(4)
if buf:
assert buf == "done"
asyncore.socket_map.clear()
def handle_close(self):
asyncore.socket_map.clear()
def handle_close(self):
asyncore.socket_map.clear()
class ZEOClientExit:
"""Used by client to cause server to exit"""
def __init__(self, pipe):
self.pipe = pipe
def close(self):
os.write(self.pipe, "done")
def start_zeo_server(storage, addr):
rd, wr = os.pipe()
pid = os.fork()
if pid == 0:
if PROFILE:
p = profile.Profile()
p.runctx("run_server(storage, addr, rd, wr)", globals(),
locals())
p.dump_stats("stats.s.%d" % os.getpid())
class ZEOClientExit:
"""Used by client to cause server to exit"""
def __init__(self, pipe):
self.pipe = pipe
def close(self):
os.write(self.pipe, "done")
def start_zeo_server(storage, addr):
rd, wr = os.pipe()
pid = os.fork()
if pid == 0:
if PROFILE:
p = profile.Profile()
p.runctx("run_server(storage, addr, rd, wr)", globals(),
locals())
p.dump_stats("stats.s.%d" % os.getpid())
else:
run_server(storage, addr, rd, wr)
os._exit(0)
else:
os.close(rd)
return pid, ZEOClientExit(wr)
def run_server(storage, addr, rd, wr):
# in the child, run the storage server
os.close(wr)
ZEOServerExit(rd)
serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
asyncore.loop()
storage.close()
if isinstance(addr, types.StringType):
os.unlink(addr)
def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET",
storage_id="1", cache_size=20000000):
"""Setup ZEO client-server for storage.
Returns a ClientStorage instance and a ZEOClientExit instance.
XXX Don't know if os.pipe() will work on Windows.
"""
if domain == "AF_INET":
import random
addr = '', random.randrange(2000, 3000)
elif domain == "AF_UNIX":
import tempfile
addr = tempfile.mktemp()
else:
run_server(storage, addr, rd, wr)
os._exit(0)
else:
os.close(rd)
return pid, ZEOClientExit(wr)
def run_server(storage, addr, rd, wr):
# in the child, run the storage server
os.close(wr)
ZEOServerExit(rd)
serv = ZEO.StorageServer.StorageServer(addr, {'1':storage})
asyncore.loop()
storage.close()
if isinstance(addr, types.StringType):
os.unlink(addr)
def start_zeo(storage, cache=None, cleanup=None, domain="AF_INET",
storage_id="1", cache_size=20000000):
"""Setup ZEO client-server for storage.
Returns a ClientStorage instance and a ZEOClientExit instance.
XXX Don't know if os.pipe() will work on Windows.
"""
if domain == "AF_INET":
import random
addr = '', random.randrange(2000, 3000)
elif domain == "AF_UNIX":
import tempfile
addr = tempfile.mktemp()
else:
raise ValueError, "bad domain: %s" % domain
pid, exit = start_zeo_server(storage, addr)
s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
debug=1, client=cache,
cache_size=cache_size,
min_disconnect_poll=0.5)
return s, exit, pid
raise ValueError, "bad domain: %s" % domain
pid, exit = start_zeo_server(storage, addr)
s = ZEO.ClientStorage.ClientStorage(addr, storage_id,
debug=1, client=cache,
cache_size=cache_size,
min_disconnect_poll=0.5)
return s, exit, pid
......@@ -3,6 +3,7 @@
import asyncore
import os
import random
import socket
import sys
import tempfile
import time
......@@ -163,7 +164,57 @@ class ZEOFileStorageTests(GenericTests):
# file storage appears to create four files
for ext in '', '.index', '.lock', '.tmp':
path = self.__fs_base + ext
os.unlink(path)
try:
os.remove(path)
except os.error:
pass
class WindowsGenericTests(GenericTests):
"""Subclass to support server creation on Windows.
On Windows, the getStorage() design won't work because the storage
can't be created in the parent process and passed to the child.
All the work has to be done in the server's process.
"""
__super_setUp = StorageTestBase.StorageTestBase.setUp
__super_tearDown = StorageTestBase.StorageTestBase.tearDown
def setUp(self):
self.__super_setUp()
args = self.getStorageInfo()
name = args[0]
args = args[1:]
zeo_addr, self.test_addr, self.test_pid = \
forker.start_zeo_server(name, args)
storage = ZEO.ClientStorage.ClientStorage(zeo_addr, debug=1,
min_disconnect_poll=0.5)
self._storage = PackWaitWrapper(storage)
storage.registerDB(DummyDB(), None)
def tearDown(self):
self._storage.close()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(self.test_addr)
# the connection should cause the storage server to die
## os.waitpid(self.test_pid, 0)
time.sleep(0.5)
self.delStorage()
self.__super_tearDown()
class WindowsZEOFileStorageTests(WindowsGenericTests):
def getStorageInfo(self):
self.__fs_base = tempfile.mktemp()
return 'FileStorage', self.__fs_base, '1'
def delStorage(self):
# file storage appears to create four files
for ext in '', '.index', '.lock', '.tmp':
path = self.__fs_base + ext
try:
os.remove(path)
except os.error:
pass
class ConnectionTests(ZEOTestBase):
"""Tests that explicitly manage the server process.
......@@ -296,6 +347,13 @@ def get_methods(klass):
meth[k] = 1
return meth.keys()
if os.name == "posix":
test_classes = ZEOFileStorageTests, ConnectionTests
elif os.name == "nt":
test_classes = WindowsZEOFileStorageTests
else:
raise RuntimeError, "unsupported os: %s" % os.name
def makeTestSuite(testname=''):
suite = unittest.TestSuite()
name = 'check' + testname
......@@ -305,6 +363,9 @@ def makeTestSuite(testname=''):
suite.addTest(klass(meth))
return suite
def test_suite():
return unittest.makeSuite(WindowsZEOFileStorageTests, 'check')
def main():
import sys, getopt
......
"""Helper file used to launch ZEO server for Windows tests"""
import asyncore
import os
import random
import socket
import threading
import types
import ZEO.StorageServer
class ZEOTestServer(asyncore.dispatcher):
"""A trivial server for killing a server at the end of a test
The server calls os._exit() as soon as it is connected to. No
chance to even send some data down the socket.
"""
__super_init = asyncore.dispatcher.__init__
def __init__(self, addr):
self.__super_init()
if type(addr) == types.StringType:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(addr)
self.listen(5)
def handle_accept(self):
sock, addr = self.accept()
os._exit(0)
def load_storage_class(name):
package = __import__("ZODB." + name)
mod = getattr(package, name)
return getattr(mod, name)
def main(port, storage_name, args):
klass = load_storage_class(storage_name)
storage = klass(*args)
zeo_port = int(port)
test_port = zeo_port + 1
t = ZEOTestServer(('', test_port))
## t = threading.Thread(target=ZEOTestServer, args=(('', test_port),))
## t.start()
serv = ZEO.StorageServer.StorageServer(('', zeo_port), {'1': storage})
asyncore.loop()
if __name__ == "__main__":
import sys
main(sys.argv[1], sys.argv[2], sys.argv[3:])
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment