Commit 8d5c5603 authored by Hanno Schlichting's avatar Hanno Schlichting

flake8

parent a1f87de3
......@@ -48,7 +48,7 @@ class WSGIResponseTests(unittest.TestCase):
response.finalize()
self.assertEqual(response.getHeader('Content-Length'), '7')
def test_finalize_skips_setting_content_length_if_missing_w_streaming(self):
def test_finalize_skips_content_length_if_missing_w_streaming(self):
response = self._makeOne()
response._streaming = True
response.body = 'TESTING'
......@@ -85,7 +85,7 @@ class WSGIResponseTests(unittest.TestCase):
from ZPublisher.Iterators import IUnboundStreamIterator
from zope.interface import implements
class test_streamiterator:
class TestStreamIterator(object):
implements(IUnboundStreamIterator)
data = "hello"
done = 0
......@@ -98,7 +98,7 @@ class WSGIResponseTests(unittest.TestCase):
response = self._makeOne()
response.setStatus(200)
body = test_streamiterator()
body = TestStreamIterator()
response.setBody(body)
response.finalize()
self.assertTrue(body is response.body)
......@@ -108,7 +108,7 @@ class WSGIResponseTests(unittest.TestCase):
from ZPublisher.Iterators import IStreamIterator
from zope.interface import implements
class test_streamiterator:
class TestStreamIterator(object):
implements(IStreamIterator)
data = "hello"
done = 0
......@@ -124,13 +124,13 @@ class WSGIResponseTests(unittest.TestCase):
response = self._makeOne()
response.setStatus(200)
body = test_streamiterator()
body = TestStreamIterator()
response.setBody(body)
response.finalize()
self.assertTrue(body is response.body)
self.assertEqual(response._streaming, 0)
self.assertEqual(response.getHeader('Content-Length'),
'%d' % len(test_streamiterator.data))
'%d' % len(TestStreamIterator.data))
def test___str___raises(self):
response = self._makeOne()
......@@ -138,7 +138,7 @@ class WSGIResponseTests(unittest.TestCase):
self.assertRaises(NotImplementedError, lambda: str(response))
class Test_publish(unittest.TestCase):
class TestPublish(unittest.TestCase):
def _callFUT(self, request, module_name, _get_module_info=None):
from ZPublisher.WSGIPublisher import publish
......@@ -204,8 +204,8 @@ class Test_publish(unittest.TestCase):
self.assertEqual(response.realm, None)
class Test_publish_module(unittest.TestCase):
class TestPublishModule(unittest.TestCase):
def setUp(self):
from zope.testing.cleanup import cleanUp
......@@ -216,7 +216,7 @@ class Test_publish_module(unittest.TestCase):
cleanUp()
def _callFUT(self, environ, start_response,
_publish=None, _response_factory=None, _request_factory=None):
_publish=None, _response_factory=None, _request_factory=None):
from ZPublisher.WSGIPublisher import publish_module
if _publish is not None:
if _response_factory is not None:
......@@ -228,7 +228,7 @@ class Test_publish_module(unittest.TestCase):
else:
if _request_factory is not None:
return publish_module(environ, start_response, _publish,
_request_factory=_request_factory)
_request_factory=_request_factory)
return publish_module(environ, start_response, _publish)
return publish_module(environ, start_response)
......@@ -245,16 +245,16 @@ class Test_publish_module(unittest.TestCase):
def _makeEnviron(self, **kw):
from StringIO import StringIO
environ = {
'SCRIPT_NAME' : '',
'REQUEST_METHOD' : 'GET',
'QUERY_STRING' : '',
'SERVER_NAME' : '127.0.0.1',
'REMOTE_ADDR': '127.0.0.1',
'wsgi.url_scheme': 'http',
'SERVER_PORT': '8080',
'HTTP_HOST': '127.0.0.1:8080',
'SERVER_PROTOCOL' : 'HTTP/1.1',
'wsgi.input' : StringIO(''),
'SCRIPT_NAME': '',
'REQUEST_METHOD': 'GET',
'QUERY_STRING': '',
'SERVER_NAME': '127.0.0.1',
'REMOTE_ADDR': '127.0.0.1',
'wsgi.url_scheme': 'http',
'SERVER_PORT': '8080',
'HTTP_HOST': '127.0.0.1:8080',
'SERVER_PROTOCOL': 'HTTP/1.1',
'wsgi.input': StringIO(''),
'CONTENT_LENGTH': '0',
'HTTP_CONNECTION': 'keep-alive',
'CONTENT_TYPE': ''
......@@ -338,11 +338,14 @@ class Test_publish_module(unittest.TestCase):
self.assertEqual(kw, {})
def test_response_body_is_file(self):
class DummyFile(file):
def __init__(self):
pass
def read(self, *args, **kw):
raise NotImplementedError()
_response = DummyResponse()
_response._status = '200 OK'
_response._headers = [('Content-Length', '4')]
......@@ -358,7 +361,7 @@ class Test_publish_module(unittest.TestCase):
from ZPublisher.Iterators import IStreamIterator
from zope.interface import implements
class test_streamiterator:
class TestStreamIterator(object):
implements(IStreamIterator)
data = "hello"
done = 0
......@@ -372,7 +375,7 @@ class Test_publish_module(unittest.TestCase):
_response = DummyResponse()
_response._status = '200 OK'
_response._headers = [('Content-Length', '4')]
body = _response.body = test_streamiterator()
body = _response.body = TestStreamIterator()
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
......@@ -384,7 +387,7 @@ class Test_publish_module(unittest.TestCase):
from ZPublisher.Iterators import IUnboundStreamIterator
from zope.interface import implements
class test_unboundstreamiterator:
class TestUnboundStreamIterator(object):
implements(IUnboundStreamIterator)
data = "hello"
done = 0
......@@ -397,7 +400,7 @@ class Test_publish_module(unittest.TestCase):
_response = DummyResponse()
_response._status = '200 OK'
body = _response.body = test_unboundstreamiterator()
body = _response.body = TestUnboundStreamIterator()
environ = self._makeEnviron()
start_response = DummyCallable()
_publish = DummyCallable()
......@@ -410,15 +413,17 @@ class Test_publish_module(unittest.TestCase):
start_response = DummyCallable()
_request = DummyRequest()
_request._closed = False
def _close():
_request._closed = True
_request.close = _close
def _request_factory(stdin, environ, response):
return _request
_publish = DummyCallable()
_publish._result = DummyResponse()
app_iter = self._callFUT(environ, start_response, _publish,
_request_factory=_request_factory)
self._callFUT(environ, start_response, _publish,
_request_factory=_request_factory)
self.assertTrue(_request._closed)
def test_request_not_closed_when_tm_middleware_active(self):
......@@ -429,21 +434,25 @@ class Test_publish_module(unittest.TestCase):
start_response = DummyCallable()
_request = DummyRequest()
_request._closed = False
def _close():
_request._closed = True
_request.close = _close
def _request_factory(stdin, environ, response):
return _request
_publish = DummyCallable()
_publish._result = DummyResponse()
app_iter = self._callFUT(environ, start_response, _publish,
_request_factory=_request_factory)
self._callFUT(environ, start_response, _publish,
_request_factory=_request_factory)
self.assertFalse(_request._closed)
txn = transaction.get()
self.assertTrue(txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
self.assertTrue(
txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
txn.commit()
self.assertTrue(_request._closed)
self.assertFalse(txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
self.assertFalse(
txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
# try again, but this time raise an exception and abort
_request._closed = False
_publish._raise = Exception('oops')
......@@ -452,9 +461,11 @@ class Test_publish_module(unittest.TestCase):
_request_factory=_request_factory)
self.assertFalse(_request._closed)
txn = transaction.get()
self.assertTrue(txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
self.assertTrue(
txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
txn.abort()
self.assertFalse(txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
self.assertFalse(
txn in WSGIPublisher._request_closer_for_repoze_tm.requests)
self.assertTrue(_request._closed)
......@@ -471,6 +482,7 @@ class DummyRequest(dict):
self._traversed = (path, response, validated_hook)
return self._traverse_to
class DummyResponse(object):
debug_mode = False
after_list = ()
......@@ -489,6 +501,7 @@ class DummyResponse(object):
body = property(lambda self: self._body, setBody)
class DummyCallable(object):
_called_with = _raise = _result = None
......@@ -498,19 +511,13 @@ class DummyCallable(object):
raise self._raise
return self._result
class DummyTM(object):
_recorded = _raise = _result = None
def recordMetaData(self, *args):
self._recorded = args
def noopStartResponse(status, headers):
pass
def test_suite():
return unittest.TestSuite((
unittest.makeSuite(WSGIResponseTests),
unittest.makeSuite(Test_publish),
unittest.makeSuite(Test_publish_module),
))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment