Commit 0e977040 authored by Tres Seaver's avatar Tres Seaver

Merge r107029 from the tseaver-fix_wsgi branch.

- Start test coverage for WSGIPublisher.
parent 5aaaad7b
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
""" Python Object Publisher -- Publish Python objects on web servers """ Python Object Publisher -- Publish Python objects on web servers
""" """
from cStringIO import StringIO from cStringIO import StringIO
import re
import sys import sys
import time import time
...@@ -25,6 +24,12 @@ from ZPublisher.HTTPRequest import HTTPRequest ...@@ -25,6 +24,12 @@ from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.maybe_lock import allocate_lock from ZPublisher.maybe_lock import allocate_lock
from ZPublisher.mapply import mapply from ZPublisher.mapply import mapply
_NOW = None # overwrite for testing
def _now():
if _NOW is not None:
return _NOW
return time.time()
class WSGIResponse(HTTPResponse): class WSGIResponse(HTTPResponse):
"""A response object for WSGI """A response object for WSGI
...@@ -34,57 +39,53 @@ class WSGIResponse(HTTPResponse): ...@@ -34,57 +39,53 @@ class WSGIResponse(HTTPResponse):
Most significantly, streaming is not (yet) supported. Most significantly, streaming is not (yet) supported.
""" """
_streaming = 0 _streaming = 0
_http_version = None
_server_version = None
_http_connection = None
def __str__(self):
def __str__(self,
html_search=re.compile('<html>',re.I).search,
):
if self._wrote: if self._wrote:
if self._chunking: if self._chunking:
return '0\r\n\r\n' return '0\r\n\r\n'
else: else:
return '' return ''
headers=self.headers headers = self.headers
body=self.body body = self.body
# set 204 (no content) status if 200 and response is empty # set 204 (no content) status if 200 and response is empty
# and not streaming # and not streaming
if not headers.has_key('content-type') and \ if ('content-type' not in headers and
not headers.has_key('content-length') and \ 'content-length' not in headers and
not self._streaming and \ not self._streaming and self.status == 200):
self.status == 200:
self.setStatus('nocontent') self.setStatus('nocontent')
# add content length if not streaming # add content length if not streaming
if not headers.has_key('content-length') and \ content_length = headers.get('content-length')
not self._streaming:
self.setHeader('content-length',len(body))
if content_length is None and not self._streaming:
self.setHeader('content-length', len(body))
content_length= headers.get('content-length', None) chunks = []
if content_length>0 : append = chunks.append
self.setHeader('content-length', content_length)
headersl=[]
append=headersl.append
status=headers.get('status', '200 OK')
# status header must come first. # status header must come first.
append("HTTP/%s %s" % (self._http_version or '1.0' , status)) version = self._http_version or '1.0'
if headers.has_key('status'): append("HTTP/%s %d %s" % (version, self.status, self.errmsg))
del headers['status']
# add zserver headers # add zserver headers
if self._server_version is not None:
append('Server: %s' % self._server_version) append('Server: %s' % self._server_version)
append('Date: %s' % build_http_date(time.time()))
if self._http_version=='1.0': append('Date: %s' % build_http_date(_now()))
if self._http_connection=='keep-alive' and \
self.headers.has_key('content-length'): if self._http_version == '1.0':
self.setHeader('Connection','Keep-Alive') if (self._http_connection == 'keep-alive' and
'content-length' in self.headers):
self.setHeader('Connection', 'Keep-Alive')
else: else:
self.setHeader('Connection','close') self.setHeader('Connection', 'close')
# Close the connection if we have been asked to. # Close the connection if we have been asked to.
# Use chunking if streaming output. # Use chunking if streaming output.
...@@ -109,10 +110,17 @@ class WSGIResponse(HTTPResponse): ...@@ -109,10 +110,17 @@ class WSGIResponse(HTTPResponse):
start=l+1 start=l+1
l=key.find('-',start) l=key.find('-',start)
append("%s: %s" % (key, val)) append("%s: %s" % (key, val))
if self.cookies: if self.cookies:
headersl=headersl+self._cookie_list() chunks.extend(self._cookie_list())
headersl[len(headersl):]=[self.accumulated_headers, body]
return "\r\n".join(headersl) for key, value in self.accumulated_headers:
append("%s: %s" % (key, value))
append('') # RFC 2616 mandates empty line between headers and payload
append(body)
return "\r\n".join(chunks)
class Retry(Exception): class Retry(Exception):
......
##############################################################################
#
# Copyright (c) 2009 Zope Foundation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import unittest
class WSGIResponseTests(unittest.TestCase):
_old_NOW = None
def tearDown(self):
if self._old_NOW is not None:
self._setNOW(self._old_NOW)
def _getTargetClass(self):
from ZPublisher.WSGIPublisher import WSGIResponse
return WSGIResponse
def _makeOne(self, *args, **kw):
return self._getTargetClass()(*args, **kw)
def _setNOW(self, value):
from ZPublisher import WSGIPublisher
WSGIPublisher._NOW, self._old_NOW = value, WSGIPublisher._NOW
def test___str__already_wrote_not_chunking(self):
response = self._makeOne()
response._wrote = True
response._chunking = False
self.assertEqual(str(response), '')
def test___str__already_wrote_w_chunking(self):
response = self._makeOne()
response._wrote = True
response._chunking = True
self.assertEqual(str(response), '0\r\n\r\n')
def test___str__sets_204_on_empty_not_streaming(self):
response = self._makeOne()
str(response) # not checking value
self.assertEqual(response.status, 204)
def test___str__sets_204_on_empty_not_streaming_ignores_non_200(self):
response = self._makeOne()
response.setStatus(302)
str(response) # not checking value
self.assertEqual(response.status, 302)
def test___str___sets_content_length_if_missing(self):
response = self._makeOne()
response.setBody('TESTING')
str(response) # not checking value
self.assertEqual(response.getHeader('Content-Length'),
str(len('TESTING')))
def test___str___skips_setting_content_length_if_missing_w_streaming(self):
response = self._makeOne()
response._streaming = True
response.body = 'TESTING'
str(response) # not checking value
self.failIf(response.getHeader('Content-Length'))
def test___str___w_default_http_version(self):
response = self._makeOne()
response.setBody('TESTING')
result = str(response).splitlines()
self.assertEqual(result[0], 'HTTP/1.0 200 OK')
def test___str___w_explicit_http_version(self):
response = self._makeOne()
response.setBody('TESTING')
response._http_version = '1.1'
result = str(response).splitlines()
self.assertEqual(result[0], 'HTTP/1.1 200 OK')
def test___str___skips_Server_header_wo_server_version_set(self):
response = self._makeOne()
response.setBody('TESTING')
result = str(response).splitlines()
sv = [x for x in result if x.lower().startswith('server-version')]
self.failIf(sv)
def test___str___includes_Server_header_w_server_version_set(self):
response = self._makeOne()
response._server_version = 'TESTME'
response.setBody('TESTING')
result = str(response).splitlines()
self.assertEqual(result[1], 'Server: TESTME')
def test___str___includes_Date_header(self):
import time
WHEN = time.localtime()
self._setNOW(time.mktime(WHEN))
response = self._makeOne()
response.setBody('TESTING')
result = str(response).splitlines()
self.assertEqual(result[1], 'Date: %s' %
time.strftime('%a, %d %b %Y %H:%M:%S GMT',
time.gmtime(time.mktime(WHEN))))
def test___str___HTTP_1_0_keep_alive_w_content_length(self):
response = self._makeOne()
response._http_version = '1.0'
response._http_connection = 'keep-alive'
response.setBody('TESTING')
str(response) # not checking value
self.assertEqual(response.getHeader('Connection'), 'Keep-Alive')
def test___str___HTTP_1_0_keep_alive_wo_content_length_streaming(self):
response = self._makeOne()
response._http_version = '1.0'
response._http_connection = 'keep-alive'
response._streaming = True
str(response) # not checking value
self.assertEqual(response.getHeader('Connection'), 'close')
def test___str___HTTP_1_0_not_keep_alive_w_content_length(self):
response = self._makeOne()
response._http_version = '1.0'
response.setBody('TESTING')
str(response) # not checking value
self.assertEqual(response.getHeader('Connection'), 'close')
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(WSGIResponseTests))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment