Commit dc3e07c2 authored by Mark Peek's avatar Mark Peek

Update unit tests

parent aacd5670
python test_accept_many.py
#python test_aio.py
python test_aio.py
python test_condition_variable.py
python test_event_queue.py
python test_in_parallel.py
python test_interrupt.py
python test_isemaphore.py
#python test_lio.py
python test_lio.py
python test_local.py
python test_lru.py
python test_mutex.py
#python test_notify_of_close.py
#python test_poller.py
python test_poller.py
python test_profile.py
#python test_read_stream.py
python test_read_stream.py
#python test_readv.py
python test_rw_lock.py
python test_semaphore.py
......
......@@ -49,6 +49,7 @@ class Test(unittest.TestCase):
self.assertEqual(a, data)
os.ftruncate(self.fd, 0)
@unittest.skipUnless(hasattr(coro, 'aio_write'), "No aio_write in coro")
def test_read_write(self):
"""Test read/write."""
self.fd = os.open('test_lio_file', Test.FLAG)
......@@ -72,6 +73,7 @@ class Test(unittest.TestCase):
os.close(self.fd)
@unittest.skipUnless(hasattr(coro, 'aio_write'), "No aio_write in coro")
def test_leak(self):
"""Test map leak."""
# There was a bug where we were leaking events in the event map.
......@@ -100,6 +102,7 @@ class Test(unittest.TestCase):
self.assertEqual(event_size, len(coro.event_map))
@unittest.skipUnless(hasattr(coro, 'aio_write'), "No aio_write in coro")
def test_error(self):
"""Test error return."""
fd = os.open('test_aio_file', Test.FLAG)
......
......@@ -46,6 +46,7 @@ class Test(unittest.TestCase):
self.assertEqual(a, data)
os.ftruncate(self.fd, 0)
@unittest.skipUnless(hasattr(coro, 'many_lio_writes'), "No many_lio_writes in coro")
def test_read_write(self):
"""Test read/write."""
self.fd = os.open('test_lio_file', os.O_RDWR | os.O_CREAT | os.O_TRUNC)
......
......@@ -21,13 +21,14 @@
"""Unittests for the poller."""
import coro
import coro_process
# import coro_process
import coro_unittest
import os
import unittest
class Test(unittest.TestCase):
@unittest.skip("no coro_process")
def test_wait_for_interrupt_new(self):
# Test KEVENT_STATUS_NEW
proc = coro_process.spawn_job_bg('sleep 30')
......@@ -50,6 +51,7 @@ class Test(unittest.TestCase):
# Even better, after the unittest process exits, if there are no
# core dumps, things are good.
@unittest.skip("no coro_process")
def test_wait_for_interrupt_submitted(self):
# Test KEVENT_STATUS_SUBMITTED
proc = coro_process.spawn_job_bg('sleep 30')
......@@ -114,6 +116,19 @@ class Test(unittest.TestCase):
os.close(read_fd2)
os.close(write_fd2)
def test_with_timeout(self):
def serve (port):
s = coro.tcp_sock()
s.bind (('', port))
s.listen (5)
with self.assertRaises(coro.TimeoutError):
conn, addr = coro.with_timeout(1, s.accept)
# do this a second time to make sure no SimultaneousErrors occur
with self.assertRaises(coro.TimeoutError):
conn, addr = coro.with_timeout(1, s.accept)
coro.spawn(serve, 8100)
coro.yield_slice()
coro.sleep_relative(3)
if __name__ == '__main__':
coro_unittest.run_tests()
# -*- Mode: Python -*-
"""Unittest for with_timeout() call."""
import time
import unittest
import os
import coro
import coro_unittest
class TestWithTimeout (unittest.TestCase):
def test_with_timeout (self):
def go():
print "timer"
with self.assertRaises(coro.TimeoutError):
#coro.with_timeout(2, coro.sleep_relative, 4)
coro.with_timeout(2, coro.waitpid, os.getpid())
print "foo"
coro.spawn(go)
for i in range(5):
coro.yield_slice()
coro.sleep_relative(1)
if __name__ == '__main__':
coro_unittest.run_tests()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment