Commit 5aaaad7b authored by Tres Seaver's avatar Tres Seaver

Merge -r 106827-106988 from the tseaver-fix_wsgi branch.

- 100% coverage for 'ZPublisher.HTTPResponse'.

- Stop dancing the status / errmsg into / out of the headers list -- they
  aren't "headers" in any practical sense.

- Conform to PEP 8.

- Normalize imports, avoiding BBB import names.
parent 3a00c170
......@@ -185,7 +185,7 @@ def http(request_string, handle_errors=True):
header_output.setResponseStatus(response.getStatus(), response.errmsg)
header_output.setResponseHeaders(response.headers)
header_output.appendResponseHeaders(response._cookie_list())
header_output.appendResponseHeaders(response.accumulated_headers.splitlines())
header_output.appendResponseHeaders(response.accumulated_headers)
sync()
......
##############################################################################
#
# Copyright (c) 2001 Zope Foundation and Contributors.
# Copyright (c) 2001-2009 Zope Foundation and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
......@@ -10,23 +10,30 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
'''CGI Response Output formatter
$Id$'''
""" CGI Response Output formatter
"""
from cgi import escape
import os
import re
from string import maketrans
from string import translate
import struct
import sys
import types
from urllib import quote
import zlib
import types, os, sys, re
import zlib, struct
from string import translate, maketrans
from zope.event import notify
from BaseResponse import BaseResponse
from zExceptions import Unauthorized, Redirect
from zExceptions import Redirect
from zExceptions import Unauthorized
from zExceptions.ExceptionFormatter import format_exception
from ZPublisher import BadRequest, InternalError, NotFound
from ZPublisher import BadRequest
from ZPublisher import InternalError
from ZPublisher import NotFound
from ZPublisher.BaseResponse import BaseResponse
from ZPublisher.pubevents import PubBeforeStreaming
from cgi import escape
from urllib import quote
nl2sp = maketrans('\n',' ')
nl2sp = maketrans('\n', ' ')
# This may get overwritten during configuration
default_encoding = 'iso-8859-15'
......@@ -104,9 +111,6 @@ status_codes['resourcelockederror'] = 423
start_of_header_search = re.compile('(<head[^>]*>)', re.IGNORECASE).search
accumulate_header = {'set-cookie': 1}.has_key
_gzip_header = ("\037\213" # magic
"\010" # compression method
"\000" # flags
......@@ -130,8 +134,7 @@ def _scrubHeader(name, value):
return ''.join(_CRLF.split(str(name))), ''.join(_CRLF.split(str(value)))
class HTTPResponse(BaseResponse):
"""\
An object representation of an HTTP response.
""" An object representation of an HTTP response.
The Response type encapsulates all possible responses to HTTP
requests. Responses are normally created by the object publisher.
......@@ -150,8 +153,8 @@ class HTTPResponse(BaseResponse):
passed into the object must be used.
""" #'
accumulated_headers = ''
body = ''
base = ''
realm = 'Zope'
_error_format = 'text/html'
_locked_status = 0
......@@ -163,61 +166,48 @@ class HTTPResponse(BaseResponse):
# 2 - ignore accept-encoding (i.e. force)
use_HTTP_content_compression = 0
def __init__(self,body='',status=200,headers=None,
stdout=sys.stdout, stderr=sys.stderr,):
'''\
Creates a new response. In effect, the constructor calls
"self.setBody(body); self.setStatus(status); for name in
headers.keys(): self.setHeader(name, headers[name])"
'''
def __init__(self,
body='',
status=200,
headers=None,
stdout=sys.stdout,
stderr=sys.stderr,
):
""" Create a new response using the given values.
"""
if headers is None:
headers = {}
self.headers = headers
self.accumulated_headers = []
if status == 200:
self.status = 200
self.errmsg = 'OK'
headers['status'] = "200 OK"
else:
self.setStatus(status)
self.base = ''
if body:
self.setBody(body)
self.cookies = {}
self.stdout = stdout
self.stderr = stderr
def retry(self):
"""Return a response object to be used in a retry attempt
""" Return a cloned response object to be used in a retry attempt.
"""
# This implementation is a bit lame, because it assumes that
# only stdout stderr were passed to the constructor. OTOH, I
# think that that's all that is ever passed.
return self.__class__(stdout=self.stdout, stderr=self.stderr)
_shutdown_flag = None
def _requestShutdown(self, exitCode=0):
"""Request that the server shut down with exitCode after fulfilling
the current request."""
import ZServer
ZServer.exit_code = exitCode
self._shutdown_flag = 1
def _shutdownRequested(self):
"""Returns true if this request requested a server shutdown."""
return self._shutdown_flag is not None
def setStatus(self, status, reason=None, lock=None):
'''\
Sets the HTTP status code of the response; the argument may
either be an integer or a string from { OK, Created, Accepted,
NoContent, MovedPermanently, MovedTemporarily,
NotModified, BadRequest, Unauthorized, Forbidden,
NotFound, InternalError, NotImplemented, BadGateway,
ServiceUnavailable } that will be converted to the correct
integer value. '''
""" Set the HTTP status code of the response
o The argument may either be an integer or a string from the
'status_reasons' dict values: status messages will be converted
to the correct integer value.
"""
if self._locked_status:
# Don't change the response status.
# It has already been determined.
......@@ -242,50 +232,191 @@ class HTTPResponse(BaseResponse):
reason = status_reasons[status]
else:
reason = 'Unknown'
self.setHeader('Status', "%d %s" % (status,str(reason)))
self.errmsg = reason
# lock the status if we're told to
if lock:
self._locked_status = 1
def setCookie(self, name, value, quoted=True, **kw):
""" Set an HTTP cookie.
The response will include an HTTP header that sets a cookie on
cookie-enabled browsers with a key "name" and value
"value".
This value overwrites any previously set value for the
cookie in the Response object.
"""
name = str(name)
value = str(value)
cookies = self.cookies
if cookies.has_key(name):
cookie = cookies[name]
else:
cookie = cookies[name] = {}
for k, v in kw.items():
cookie[k] = v
cookie['value'] = value
cookie['quoted'] = quoted
def appendCookie(self, name, value):
""" Set an HTTP cookie.
Returns an HTTP header that sets a cookie on cookie-enabled
browsers with a key "name" and value "value". If a value for the
cookie has previously been set in the response object, the new
value is appended to the old one separated by a colon.
"""
name = str(name)
value = str(value)
cookies = self.cookies
if cookies.has_key(name):
cookie = cookies[name]
else:
cookie = cookies[name] = {}
if cookie.has_key('value'):
cookie['value'] = '%s:%s' % (cookie['value'], value)
else:
cookie['value'] = value
def expireCookie(self, name, **kw):
""" Clear an HTTP cookie.
The response will include an HTTP header that will remove the cookie
corresponding to "name" on the client, if one exists. This is
accomplished by sending a new cookie with an expiration date
that has already passed. Note that some clients require a path
to be specified - this path must exactly match the path given
when creating the cookie. The path can be specified as a keyword
argument.
"""
name = str(name)
d = kw.copy()
if 'value' in d:
d.pop('value')
d['max_age'] = 0
d['expires'] = 'Wed, 31-Dec-97 23:59:59 GMT'
self.setCookie(name, value='deleted', **d)
def getHeader(self, name, literal=0):
""" Get a previously set header value.
Return the value associated with a HTTP return header, or
None if no such header has been set in the response
yet.
If the 'literal' flag is true, preserve the case of the header name;
otherwise lower-case the header name before looking up the value.
"""
key = literal and name or name.lower()
return self.headers.get(key, None)
def setHeader(self, name, value, literal=0, scrubbed=False):
'''\
Sets an HTTP return header "name" with value "value", clearing
the previous value set for the header, if one exists. If the
literal flag is true, the case of the header name is preserved,
otherwise the header name will be lowercased.'''
""" Set an HTTP return header on the response.
Replay any existing value set for the header.
If the 'literal' flag is true, preserve the case of the header name;
otherwise the header name will be lowercased.
'scrubbed' is for internal use, to indicate that another API has
already removed any CRLF from the name and value.
"""
if not scrubbed:
name, value = _scrubHeader(name, value)
key = name.lower()
if accumulate_header(key):
self.accumulated_headers = (
"%s%s: %s\r\n" % (self.accumulated_headers, name, value))
return
# The following is crazy, given that we have APIs for cookies.
# Special behavior will go away in Zope 2.13
if key == 'set-cookie':
self.accumulated_headers.append((name, value))
else:
name = literal and name or key
self.headers[name] = value
def getHeader(self, name, literal=0):
'''\
Get a header value
def appendHeader(self, name, value, delimiter=","):
""" Append a value to an HTTP return header.
Returns the value associated with a HTTP return header, or
"None" if no such header has been set in the response
yet. If the literal flag is true, the case of the header name is
preserved, otherwise the header name will be lowercased.'''
key = name.lower()
name = literal and name or key
return self.headers.get(name, None)
Set an HTTP return header "name" with value "value",
appending it following a comma if there was a previous value
set for the header.
'name' is always lowercased before use.
"""
name, value = _scrubHeader(name, value)
name = name.lower()
headers = self.headers
if headers.has_key(name):
h = headers[name]
h = "%s%s\r\n\t%s" % (h, delimiter, value)
else:
h = value
self.setHeader(name,h, scrubbed=True)
def addHeader(self, name, value):
'''\
Set a new HTTP return header with the given value, while retaining
any previously set headers with the same name.'''
""" Set a new HTTP return header with the given value,
Retain any previously set headers with the same name.
Note that this API appneds to the 'accumulated_headers' attribute;
it does not update the 'headers' mapping.
"""
name, value = _scrubHeader(name, value)
self.accumulated_headers = (
"%s%s: %s\r\n" % (self.accumulated_headers, name, value))
self.accumulated_headers.append((name, value))
__setitem__ = setHeader
def setBase(self, base):
"""Set the base URL for the returned document.
If base is None, set to the empty string.
If base is not None, ensure that it has a trailing slach.
"""
if base is None:
base = ''
elif not base.endswith('/'):
base = base + '/'
self.base = str(base)
def insertBase(self,
base_re_search=re.compile('(<base.*?>)',re.I).search
):
# Only insert a base tag if content appears to be html.
content_type = self.headers.get('content-type', '').split(';')[0]
if content_type and (content_type != 'text/html'):
return
if self.base:
body = self.body
if body:
match = start_of_header_search(body)
if match is not None:
index = match.start(0) + len(match.group(0))
ibase = base_re_search(body)
if ibase is None:
self.body = ('%s\n<base href="%s" />\n%s' %
(body[:index], escape(self.base, 1),
body[index:]))
self.setHeader('content-length', len(self.body))
def isHTML(self, s):
s = s.lstrip()
# Note that the string can be big, so s.lower().startswith() is more
# expensive than s[:n].lower().
if (s[:6].lower() == '<html>' or s[:14].lower() == '<!doctype html'):
return 1
if s.find('</') > 0:
return 1
return 0
def setBody(self, body, title='', is_error=0,
bogus_str_search=re.compile(" [a-fA-F0-9]+>$").search,
latin1_alias_match=re.compile(
......@@ -294,21 +425,31 @@ class HTTPResponse(BaseResponse):
r'(iso[-_]8859[-_]1(:1987)?)))?$',re.I).match,
lock=None
):
'''\
Set the body of the response
""" Set the body of the response
Sets the return body equal to the (string) argument "body". Also
updates the "content-length" return header.
If the body is already locked via a previous call, do nothing and
return None.
You can also specify a title, in which case the title and body
will be wrapped up in html, head, title, and body tags.
If the body is a 2-element tuple, then it will be treated
as (title,body)
If is_error is true then the HTML will be formatted as a Zope error
message instead of a generic HTML page.
'''
If body is unicode, encode it.
If body is not a string or unicode, but has an 'asHTML' method, use
the result of that method as the body; otherwise, use the 'str'
of body.
If is_error is true, format the HTML as a Zope error message instead
of a generic HTML page.
Return 'self' (XXX as a true value?).
"""
# allow locking of the body in the same way as the status
if self._locked_body:
return
......@@ -340,7 +481,7 @@ class HTTPResponse(BaseResponse):
bogus_str_search(body) is not None):
self.notFoundError(body[1:-1])
else:
if(title):
if title:
title = str(title)
if not is_error:
self.body = self._html(title, body)
......@@ -350,30 +491,34 @@ class HTTPResponse(BaseResponse):
self.body = body
isHTML = self.isHTML(self.body)
if not self.headers.has_key('content-type'):
if isHTML:
c = 'text/html; charset=%s' % default_encoding
else:
c = 'text/plain; charset=%s' % default_encoding
self.setHeader('content-type', c)
else:
c = self.headers['content-type']
if c.startswith('text/') and not 'charset=' in c:
c = '%s; charset=%s' % (c, default_encoding)
self.setHeader('content-type', c)
content_type = self.headers.get('content-type')
# Some browsers interpret certain characters in Latin 1 as html
# special characters. These cannot be removed by html_quote,
# because this is not the case for all encodings.
content_type = self.headers['content-type']
if content_type == 'text/html' or latin1_alias_match(
content_type) is not None:
if (content_type == 'text/html' or
content_type and latin1_alias_match(content_type) is not None):
body = '&lt;'.join(body.split('\213'))
body = '&gt;'.join(body.split('\233'))
self.body = body
if content_type is None:
if self.isHTML(self.body):
content_type = 'text/html; charset=%s' % default_encoding
else:
content_type = 'text/plain; charset=%s' % default_encoding
self.setHeader('content-type', content_type)
else:
if (content_type.startswith('text/') and
'charset=' not in content_type):
content_type = '%s; charset=%s' % (content_type,
default_encoding)
self.setHeader('content-type', content_type)
self.setHeader('content-length', len(self.body))
self.insertBase()
if self.use_HTTP_content_compression and \
self.headers.get('content-encoding', 'gzip') == 'gzip':
# use HTTP content encoding to compress body contents unless
......@@ -402,10 +547,10 @@ class HTTPResponse(BaseResponse):
# respect Accept-Encoding client header
vary = self.getHeader('Vary')
if vary is None or 'Accept-Encoding' not in vary:
self.appendHeader('Vary','Accept-Encoding')
self.appendHeader('Vary', 'Accept-Encoding')
return self
def enableHTTPCompression(self,REQUEST={},force=0,disable=0,query=0):
def enableHTTPCompression(self, REQUEST={}, force=0, disable=0, query=0):
"""Enable HTTP Content Encoding with gzip compression if possible
REQUEST -- used to check if client can accept compression
......@@ -458,11 +603,35 @@ class HTTPResponse(BaseResponse):
return self.use_HTTP_content_compression
def redirect(self, location, status=302, lock=0):
"""Cause a redirection without raising an error"""
self.setStatus(status, lock=lock)
self.setHeader('Location', location)
return str(location)
# The following two methods are part of a private protocol with the
# publisher for handling fatal import errors and TTW shutdown requests.
_shutdown_flag = None
def _requestShutdown(self, exitCode=0):
""" Request that the server shut down with exitCode after fulfilling
the current request.
"""
import ZServer
ZServer.exit_code = exitCode
self._shutdown_flag = 1
def _shutdownRequested(self):
""" Returns true if this request requested a server shutdown.
"""
return self._shutdown_flag is not None
def _encode_unicode(self,body,
charset_re=re.compile(r'(?:application|text)/[-+0-9a-z]+\s*;\s*' +
charset_re=re.compile(
r'(?:application|text)/[-+0-9a-z]+\s*;\s*' +
r'charset=([-_0-9a-z]+' +
r')(?:(?:\s*;)|\Z)',
re.IGNORECASE)):
r')(?:(?:\s*;)|\Z)', re.IGNORECASE)):
def fix_xml_preamble(body, encoding):
""" fixes the encoding in the XML preamble according
......@@ -471,7 +640,8 @@ class HTTPResponse(BaseResponse):
if body.startswith('<?xml'):
pos_right = body.find('?>') # right end of the XML preamble
body = ('<?xml version="1.0" encoding="%s" ?>' % encoding) + body[pos_right+2:]
body = ('<?xml version="1.0" encoding="%s" ?>'
% encoding) + body[pos_right+2:]
return body
# Encode the Unicode data as requested
......@@ -486,133 +656,14 @@ class HTTPResponse(BaseResponse):
return body
else:
if ct.startswith('text/') or ct.startswith('application/'):
self.headers['content-type'] = '%s; charset=%s' % (ct, default_encoding)
self.headers['content-type'] = '%s; charset=%s' % (ct,
default_encoding)
# Use the default character encoding
body = body.encode(default_encoding, 'replace')
body = fix_xml_preamble(body, default_encoding)
return body
def setBase(self,base):
"""Set the base URL for the returned document.
If base is None, or the document already has a base, do nothing."""
if base is None:
base = ''
elif not base.endswith('/'):
base = base+'/'
self.base = str(base)
def insertBase(self,
base_re_search=re.compile('(<base.*?>)',re.I).search
):
# Only insert a base tag if content appears to be html.
content_type = self.headers.get('content-type', '').split(';')[0]
if content_type and (content_type != 'text/html'):
return
if self.base:
body = self.body
if body:
match = start_of_header_search(body)
if match is not None:
index = match.start(0) + len(match.group(0))
ibase = base_re_search(body)
if ibase is None:
self.body = ('%s\n<base href="%s" />\n%s' %
(body[:index], escape(self.base, 1),
body[index:]))
self.setHeader('content-length', len(self.body))
def appendCookie(self, name, value):
'''\
Returns an HTTP header that sets a cookie on cookie-enabled
browsers with a key "name" and value "value". If a value for the
cookie has previously been set in the response object, the new
value is appended to the old one separated by a colon. '''
name = str(name)
value = str(value)
cookies = self.cookies
if cookies.has_key(name):
cookie = cookies[name]
else:
cookie = cookies[name] = {}
if cookie.has_key('value'):
cookie['value'] = '%s:%s' % (cookie['value'], value)
else:
cookie['value'] = value
def expireCookie(self, name, **kw):
'''\
Cause an HTTP cookie to be removed from the browser
The response will include an HTTP header that will remove the cookie
corresponding to "name" on the client, if one exists. This is
accomplished by sending a new cookie with an expiration date
that has already passed. Note that some clients require a path
to be specified - this path must exactly match the path given
when creating the cookie. The path can be specified as a keyword
argument.
'''
name = str(name)
d = kw.copy()
d['max_age'] = 0
d['expires'] = 'Wed, 31-Dec-97 23:59:59 GMT'
apply(HTTPResponse.setCookie, (self, name, 'deleted'), d)
def setCookie(self, name, value, quoted=True, **kw):
'''\
Set an HTTP cookie on the browser
The response will include an HTTP header that sets a cookie on
cookie-enabled browsers with a key "name" and value
"value". This overwrites any previously set value for the
cookie in the Response object.
'''
name = str(name)
value = str(value)
cookies = self.cookies
if cookies.has_key(name):
cookie = cookies[name]
else:
cookie = cookies[name] = {}
for k, v in kw.items():
cookie[k] = v
cookie['value'] = value
cookie['quoted'] = quoted
def appendHeader(self, name, value, delimiter=","):
'''\
Append a value to a header.
Sets an HTTP return header "name" with value "value",
appending it following a comma if there was a previous value
set for the header. '''
name, value = _scrubHeader(name, value)
name = name.lower()
headers = self.headers
if headers.has_key(name):
h = headers[name]
h = "%s%s\r\n\t%s" % (h,delimiter,value)
else:
h = value
self.setHeader(name,h, scrubbed=True)
def isHTML(self, s):
s = s.lstrip()
# Note that the string can be big, so s.lower().startswith() is more
# expensive than s[:n].lower().
if (s[:6].lower() == '<html>' or s[:14].lower() == '<!doctype html'):
return 1
if s.find('</') > 0:
return 1
return 0
# deprecated
def quoteHTML(self, text):
return escape(text, 1)
......@@ -621,20 +672,6 @@ class HTTPResponse(BaseResponse):
tb = format_exception(t, v, tb, as_html=as_html)
return '\n'.join(tb)
def redirect(self, location, status=302, lock=0):
"""Cause a redirection without raising an error"""
self.setStatus(status)
self.setHeader('Location', location)
location = str(location)
if lock:
# Don't let anything change the status code.
# The "lock" argument needs to be set when redirecting
# from a standard_error_message page.
self._locked_status = 1
return location
def _html(self,title,body):
return ("<html>\n"
......@@ -806,7 +843,8 @@ class HTTPResponse(BaseResponse):
if fatal and t is SystemExit and v.code == 0:
body = self.setBody(
(str(t),
'Zope has exited normally.<p>' + self._traceback(t, v, tb) + '</p>'),
'Zope has exited normally.<p>'
+ self._traceback(t, v, tb) + '</p>'),
is_error=1)
else:
try:
......@@ -885,23 +923,23 @@ class HTTPResponse(BaseResponse):
not headers.has_key('transfer-encoding'):
self.setHeader('content-length',len(body))
headersl = []
append = headersl.append
chunks = []
append = chunks.append
# status header must come first.
append("Status: %s" % headers.get('status', '200 OK'))
append("Status: %d %s" % (self.status, self.errmsg))
append("X-Powered-By: Zope (www.zope.org), Python (www.python.org)")
if headers.has_key('status'):
del headers['status']
for key, val in headers.items():
for key, value in headers.items():
if key.lower() == key:
# only change non-literal header names
key = '-'.join([x.capitalize() for x in key.split('-')])
append("%s: %s" % (key, val))
if self.cookies:
headersl = headersl+self._cookie_list()
headersl[len(headersl):] = [self.accumulated_headers, body]
return '\r\n'.join(headersl)
append("%s: %s" % (key, value))
chunks.extend(self._cookie_list())
for key, value in self.accumulated_headers:
append("%s: %s" % (key, value))
append('') # RFC 2616 mandates empty line between headers and payload
append(body)
return '\r\n'.join(chunks)
def write(self,data):
"""\
......
......@@ -10,28 +10,29 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
__doc__="""Python Object Publisher -- Publish Python objects on web servers
$Id: Publish.py 67721 2006-04-28 14:57:35Z regebro $"""
""" Python Object Publisher -- Publish Python objects on web servers
"""
from cStringIO import StringIO
import re
import sys
import time
import sys, os, re, time
import transaction
from Response import Response
from Request import Request
from maybe_lock import allocate_lock
from mapply import mapply
from zExceptions import Redirect
from cStringIO import StringIO
from ZServer.medusa.http_date import build_http_date
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.maybe_lock import allocate_lock
from ZPublisher.mapply import mapply
class WSGIResponse(Response):
class WSGIResponse(HTTPResponse):
"""A response object for WSGI
This Response object knows nothing about ZServer, but tries to be
compatible with the ZServerHTTPResponse.
Most significantly, streaming is not (yet) supported."""
Most significantly, streaming is not (yet) supported.
"""
_streaming = 0
def __str__(self,
......@@ -271,7 +272,7 @@ def publish_module_standard(environ, start_response):
response._http_connection = environ.get('CONNECTION_TYPE', 'close')
response._server_version = environ['SERVER_SOFTWARE']
request = Request(environ['wsgi.input'], environ, response)
request = HTTPRequest(environ['wsgi.input'], environ, response)
# Let's support post-mortem debugging
handle_errors = environ.get('wsgi.handleErrors', True)
......
......@@ -4,6 +4,17 @@ import unittest
class HTTPResponseTests(unittest.TestCase):
_old_default_encoding = None
def tearDown(self):
if self._old_default_encoding is not None:
self._setDefaultEncoding(self._old_default_encoding)
def _setDefaultEncoding(self, value):
from ZPublisher import HTTPResponse as module
(module.default_encoding,
self._old_default_encoding) = (value, module.default_encoding)
def _getTargetClass(self):
from ZPublisher.HTTPResponse import HTTPResponse
......@@ -13,22 +24,180 @@ class HTTPResponseTests(unittest.TestCase):
return self._getTargetClass()(*args, **kw)
def test_setStatus_with_exceptions(self):
def test_ctor_defaults(self):
import sys
response = self._makeOne()
self.assertEqual(response.accumulated_headers, [])
self.assertEqual(response.status, 200)
self.assertEqual(response.errmsg, 'OK')
self.assertEqual(response.base, '')
self.assertEqual(response.body, '')
self.assertEqual(response.cookies, {})
self.failUnless(response.stdout is sys.stdout)
self.failUnless(response.stderr is sys.stderr)
def test_ctor_w_body(self):
response = self._makeOne(body='ABC')
self.assertEqual(response.body, 'ABC')
def test_ctor_w_headers(self):
response = self._makeOne(headers={'foo': 'bar'})
self.assertEqual(response.headers, {'foo': 'bar',
})
def test_ctor_w_status_code(self):
response = self._makeOne(status=401)
self.assertEqual(response.status, 401)
self.assertEqual(response.errmsg, 'Unauthorized')
self.assertEqual(response.headers, {})
def test_ctor_w_status_errmsg(self):
response = self._makeOne(status='Unauthorized')
self.assertEqual(response.status, 401)
self.assertEqual(response.errmsg, 'Unauthorized')
self.assertEqual(response.headers, {})
def test_ctor_w_status_exception(self):
from zExceptions import Unauthorized
response = self._makeOne(status=Unauthorized)
self.assertEqual(response.status, 401)
self.assertEqual(response.errmsg, 'Unauthorized')
self.assertEqual(response.headers, {})
def test_ctor_charset_no_content_type_header(self):
response = self._makeOne(body='foo')
self.assertEqual(response.headers.get('content-type'),
'text/plain; charset=iso-8859-15')
def test_ctor_charset_text_header_no_charset_defaults_latin1(self):
response = self._makeOne(body='foo',
headers={'content-type': 'text/plain'})
self.assertEqual(response.headers.get('content-type'),
'text/plain; charset=iso-8859-15')
def test_ctor_charset_application_header_no_header(self):
response = self._makeOne(body='foo',
headers={'content-type': 'application/foo'})
self.assertEqual(response.headers.get('content-type'),
'application/foo')
def test_ctor_charset_application_header_with_header(self):
response = self._makeOne(body='foo',
headers={'content-type':
'application/foo; charset: something'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset: something')
def test_ctor_charset_unicode_body_application_header(self):
BODY = unicode('rger', 'iso-8859-15')
response = self._makeOne(body=BODY,
headers={'content-type': 'application/foo'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset=iso-8859-15')
self.assertEqual(response.body, 'rger')
def test_ctor_charset_unicode_body_application_header_diff_encoding(self):
BODY = unicode('rger', 'iso-8859-15')
response = self._makeOne(body=BODY,
headers={'content-type':
'application/foo; charset=utf-8'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset=utf-8')
# Body is re-encoded to match the header
self.assertEqual(response.body, BODY.encode('utf-8'))
def test_ctor_body_recodes_to_match_content_type_charset(self):
xml = (u'<?xml version="1.0" encoding="iso-8859-15" ?>\n'
'<foo><bar/></foo>')
response = self._makeOne(body=xml, headers={'content-type':
'text/xml; charset=utf-8'})
self.assertEqual(response.body, xml.replace('iso-8859-15', 'utf-8'))
def test_ctor_body_already_matches_charset_unchanged(self):
xml = (u'<?xml version="1.0" encoding="iso-8859-15" ?>\n'
'<foo><bar/></foo>')
response = self._makeOne(body=xml, headers={'content-type':
'text/xml; charset=iso-8859-15'})
self.assertEqual(response.body, xml)
def test_retry(self):
STDOUT, STDERR = object(), object()
response = self._makeOne(stdout=STDOUT, stderr=STDERR)
cloned = response.retry()
self.failUnless(isinstance(cloned, self._getTargetClass()))
self.failUnless(cloned.stdout is STDOUT)
self.failUnless(cloned.stderr is STDERR)
def test_setStatus_code(self):
response = self._makeOne()
response.setStatus(400)
self.assertEqual(response.status, 400)
self.assertEqual(response.errmsg, 'Bad Request')
def test_setStatus_errmsg(self):
response = self._makeOne()
response.setStatus('Bad Request')
self.assertEqual(response.status, 400)
self.assertEqual(response.errmsg, 'Bad Request')
def test_setStatus_BadRequest(self):
from zExceptions import BadRequest
response = self._makeOne()
response.setStatus(BadRequest)
self.assertEqual(response.status, 400)
self.assertEqual(response.errmsg, 'Bad Request')
def test_setStatus_Unauthorized_exception(self):
from zExceptions import Unauthorized
response = self._makeOne()
response.setStatus(Unauthorized)
self.assertEqual(response.status, 401)
self.assertEqual(response.errmsg, 'Unauthorized')
def test_setStatus_Forbidden_exception(self):
from zExceptions import Forbidden
response = self._makeOne()
response.setStatus(Forbidden)
self.assertEqual(response.status, 403)
self.assertEqual(response.errmsg, 'Forbidden')
def test_setStatus_NotFound_exception(self):
from zExceptions import NotFound
from zExceptions import BadRequest
response = self._makeOne()
response.setStatus(NotFound)
self.assertEqual(response.status, 404)
self.assertEqual(response.errmsg, 'Not Found')
def test_setStatus_ResourceLockedError_exception(self):
response = self._makeOne()
from webdav.Lockable import ResourceLockedError
response.setStatus(ResourceLockedError)
self.assertEqual(response.status, 423)
self.assertEqual(response.errmsg, 'Locked')
def test_setStatus_InternalError_exception(self):
from zExceptions import InternalError
response = self._makeOne()
response.setStatus(InternalError)
self.assertEqual(response.status, 500)
self.assertEqual(response.errmsg, 'Internal Server Error')
for exc_type, code in ((Unauthorized, 401),
(Forbidden, 403),
(NotFound, 404),
(BadRequest, 400),
(InternalError, 500)):
def test_setCookie_no_existing(self):
response = self._makeOne()
response.setStatus(exc_type)
self.assertEqual(response.status, code)
response.setCookie('foo', 'bar')
cookie = response.cookies.get('foo', None)
self.assertEqual(len(cookie), 2)
self.assertEqual(cookie.get('value'), 'bar')
self.assertEqual(cookie.get('quoted'), True)
def test_setCookie_w_existing(self):
response = self._makeOne()
response.setCookie('foo', 'bar')
response.setCookie('foo', 'baz')
cookie = response.cookies.get('foo', None)
self.assertEqual(len(cookie), 2)
self.assertEqual(cookie.get('value'), 'baz')
self.assertEqual(cookie.get('quoted'), True)
def test_setCookie_no_attrs(self):
response = self._makeOne()
......@@ -37,7 +206,6 @@ class HTTPResponseTests(unittest.TestCase):
self.assertEqual(len(cookie), 2)
self.assertEqual(cookie.get('value'), 'bar')
self.assertEqual(cookie.get('quoted'), True)
cookies = response._cookie_list()
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
......@@ -123,15 +291,6 @@ class HTTPResponseTests(unittest.TestCase):
self.assertEqual(len(cookies), 1)
self.assertEqual(cookies[0], 'Set-Cookie: foo="bar"')
def test_expireCookie(self):
response = self._makeOne()
response.expireCookie('foo', path='/')
cookie = response.cookies.get('foo', None)
self.failUnless(cookie)
self.assertEqual(cookie.get('expires'), 'Wed, 31-Dec-97 23:59:59 GMT')
self.assertEqual(cookie.get('max_age'), 0)
self.assertEqual(cookie.get('path'), '/')
def test_setCookie_w_httponly_true_value(self):
response = self._makeOne()
response.setCookie('foo', 'bar', http_only=True)
......@@ -170,6 +329,31 @@ class HTTPResponseTests(unittest.TestCase):
self.assertEqual(len(cookie_list), 1)
self.assertEqual(cookie_list[0], 'Set-Cookie: foo=bar')
def test_appendCookie_w_existing(self):
response = self._makeOne()
response.setCookie('foo', 'bar', path='/')
response.appendCookie('foo', 'baz')
cookie = response.cookies.get('foo', None)
self.failUnless(cookie)
self.assertEqual(cookie.get('value'), 'bar:baz')
self.assertEqual(cookie.get('path'), '/')
def test_appendCookie_no_existing(self):
response = self._makeOne()
response.appendCookie('foo', 'baz')
cookie = response.cookies.get('foo', None)
self.failUnless(cookie)
self.assertEqual(cookie.get('value'), 'baz')
def test_expireCookie(self):
response = self._makeOne()
response.expireCookie('foo', path='/')
cookie = response.cookies.get('foo', None)
self.failUnless(cookie)
self.assertEqual(cookie.get('expires'), 'Wed, 31-Dec-97 23:59:59 GMT')
self.assertEqual(cookie.get('max_age'), 0)
self.assertEqual(cookie.get('path'), '/')
def test_expireCookie1160(self):
# Verify that the cookie is expired even if an expires kw arg is passed
# http://zope.org/Collectors/Zope/1160
......@@ -182,23 +366,21 @@ class HTTPResponseTests(unittest.TestCase):
self.assertEqual(cookie.get('max_age'), 0)
self.assertEqual(cookie.get('path'), '/')
def test_appendCookie(self):
def test_getHeader_nonesuch(self):
response = self._makeOne()
response.setCookie('foo', 'bar', path='/')
response.appendCookie('foo', 'baz')
cookie = response.cookies.get('foo', None)
self.failUnless(cookie)
self.assertEqual(cookie.get('value'), 'bar:baz')
self.assertEqual(cookie.get('path'), '/')
self.assertEqual(response.getHeader('nonesuch'), None)
def test_appendHeader(self):
response = self._makeOne()
response.setHeader('foo', 'bar')
response.appendHeader('foo', 'foo')
self.assertEqual(response.headers.get('foo'), 'bar,\r\n\tfoo')
response.setHeader('xxx', 'bar')
response.appendHeader('XXX', 'foo')
self.assertEqual(response.headers.get('xxx'), 'bar,\r\n\tfoo')
def test_getHeader_existing(self):
response = self._makeOne(headers={'foo': 'bar'})
self.assertEqual(response.getHeader('foo'), 'bar')
def test_getHeader_existing_not_literal(self):
response = self._makeOne(headers={'foo': 'bar'})
self.assertEqual(response.getHeader('Foo'), 'bar')
def test_getHeader_existing_w_literal(self):
response = self._makeOne(headers={'Foo': 'Bar'})
self.assertEqual(response.getHeader('Foo', literal=True), 'Bar')
def test_setHeader(self):
response = self._makeOne()
......@@ -217,64 +399,57 @@ class HTTPResponseTests(unittest.TestCase):
self.assertEqual(response.getHeader('SPAM', literal=True), 'eggs')
self.assertEqual(response.getHeader('spam'), None)
def test_setStatus_ResourceLockedError(self):
def test_setHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
response = self._makeOne()
from webdav.Lockable import ResourceLockedError
response.setStatus(ResourceLockedError)
self.assertEqual(response.status, 423)
def test_charset_no_header(self):
response = self._makeOne(body='foo')
self.assertEqual(response.headers.get('content-type'),
'text/plain; charset=iso-8859-15')
def test_charset_text_header(self):
response = self._makeOne(body='foo',
headers={'content-type': 'text/plain'})
self.assertEqual(response.headers.get('content-type'),
'text/plain; charset=iso-8859-15')
response.setHeader('Location',
'http://www.ietf.org/rfc/\r\nrfc2616.txt')
self.assertEqual(response.headers['location'],
'http://www.ietf.org/rfc/rfc2616.txt')
def test_charset_application_header_no_header(self):
response = self._makeOne(body='foo',
headers={'content-type': 'application/foo'})
self.assertEqual(response.headers.get('content-type'),
'application/foo')
def test_setHeader_Set_Cookie_special_case(self):
# This is crazy, given that we have APIs for cookies. Special
# behavior will go away in Zope 2.13
response = self._makeOne()
response.setHeader('Set-Cookie', 'foo="bar"')
self.assertEqual(response.getHeader('Set-Cookie'), None)
self.assertEqual(response.accumulated_headers,
[('Set-Cookie', 'foo="bar"')])
def test_charset_application_header_with_header(self):
response = self._makeOne(body='foo',
headers={'content-type': 'application/foo; charset: something'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset: something')
def test_setHeader_drops_CRLF_when_accumulating(self):
# RFC2616 disallows CRLF in a header value.
# This is crazy, given that we have APIs for cookies. Special
# behavior will go away in Zope 2.13
response = self._makeOne()
response.setHeader('Set-Cookie', 'allowed="OK"')
response.setHeader('Set-Cookie',
'violation="http://www.ietf.org/rfc/\r\nrfc2616.txt"')
self.assertEqual(response.accumulated_headers,
[('Set-Cookie', 'allowed="OK"'),
('Set-Cookie',
'violation="http://www.ietf.org/rfc/rfc2616.txt"')])
def test_charset_application_header_unicode(self):
response = self._makeOne(body=unicode('rger', 'iso-8859-15'),
headers={'content-type': 'application/foo'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset=iso-8859-15')
self.assertEqual(response.body, 'rger')
def test_appendHeader_no_existing(self):
response = self._makeOne()
response.appendHeader('foo', 'foo')
self.assertEqual(response.headers.get('foo'), 'foo')
def test_charset_application_header_unicode_1(self):
response = self._makeOne(body=unicode('rger', 'iso-8859-15'),
headers={'content-type': 'application/foo; charset=utf-8'})
self.assertEqual(response.headers.get('content-type'),
'application/foo; charset=utf-8')
self.assertEqual(response.body, unicode('rger',
'iso-8859-15').encode('utf-8'))
def test_appendHeader_no_existing_case_insensative(self):
response = self._makeOne()
response.appendHeader('Foo', 'foo')
self.assertEqual(response.headers.get('foo'), 'foo')
def test_XMLEncodingRecoding(self):
xml = u'<?xml version="1.0" encoding="iso-8859-15" ?>\n<foo><bar/></foo>'
response = self._makeOne(body=xml, headers={'content-type': 'text/xml; charset=utf-8'})
self.assertEqual(xml.replace('iso-8859-15', 'utf-8')==response.body, True)
response = self._makeOne(body=xml, headers={'content-type': 'text/xml; charset=iso-8859-15'})
self.assertEqual(xml==response.body, True)
def test_appendHeader_w_existing(self):
response = self._makeOne()
response.setHeader('foo', 'bar')
response.appendHeader('foo', 'foo')
self.assertEqual(response.headers.get('foo'), 'bar,\r\n\tfoo')
def test_addHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
def test_appendHeader_w_existing_case_insenstative(self):
response = self._makeOne()
response.addHeader('Location',
'http://www.ietf.org/rfc/\r\nrfc2616.txt')
self.assertEqual(response.accumulated_headers,
'Location: http://www.ietf.org/rfc/rfc2616.txt\r\n')
response.setHeader('xxx', 'bar')
response.appendHeader('XXX', 'foo')
self.assertEqual(response.headers.get('xxx'), 'bar,\r\n\tfoo')
def test_appendHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
......@@ -284,44 +459,659 @@ class HTTPResponseTests(unittest.TestCase):
self.assertEqual(response.headers['location'],
'http://www.ietf.org/rfc/rfc2616.txt')
def test_setHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
def test_addHeader_is_case_sensitive(self):
response = self._makeOne()
response.setHeader('Location',
'http://www.ietf.org/rfc/\r\nrfc2616.txt')
self.assertEqual(response.headers['location'],
'http://www.ietf.org/rfc/rfc2616.txt')
response.addHeader('Location', 'http://www.ietf.org/rfc/rfc2616.txt')
self.assertEqual(response.accumulated_headers,
[('Location', 'http://www.ietf.org/rfc/rfc2616.txt')])
def test_setHeader_drops_CRLF_when_accumulating(self):
def test_addHeader_drops_CRLF(self):
# RFC2616 disallows CRLF in a header value.
response = self._makeOne()
response.setHeader('Set-Cookie', 'allowed="OK"')
response.setHeader('Set-Cookie',
'violation="http://www.ietf.org/rfc/\r\nrfc2616.txt"')
response.addHeader('Location',
'http://www.ietf.org/rfc/\r\nrfc2616.txt')
self.assertEqual(response.accumulated_headers,
'Set-Cookie: allowed="OK"\r\n' +
'Set-Cookie: '
'violation="http://www.ietf.org/rfc/rfc2616.txt"\r\n')
[('Location', 'http://www.ietf.org/rfc/rfc2616.txt')])
def test_setBase_None(self):
response = self._makeOne()
response.base = 'BEFORE'
response.setBase(None)
self.assertEqual(response.base, '')
def test_setBase_no_trailing_path(self):
response = self._makeOne()
response.setBase('foo')
self.assertEqual(response.base, 'foo/')
def test_setBase_w_trailing_path(self):
response = self._makeOne()
response.setBase('foo/')
self.assertEqual(response.base, 'foo/')
def test_insertBase_not_HTML_no_change(self):
response = self._makeOne()
response.setHeader('Content-Type', 'application/pdf')
response.setHeader('Content-Length', 8)
response.body = 'BLAHBLAH'
response.insertBase()
self.assertEqual(response.body, 'BLAHBLAH')
self.assertEqual(response.getHeader('Content-Length'), '8')
def test_insertBase_HTML_no_base_w_head_not_munged(self):
HTML = '<html><head></head><body></body></html>'
response = self._makeOne()
response.setHeader('Content-Type', 'text/html')
response.setHeader('Content-Length', len(HTML))
response.body = HTML
response.insertBase()
self.assertEqual(response.body, HTML)
self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
def test_insertBase_HTML_w_base_no_head_not_munged(self):
HTML = '<html><body></body></html>'
response = self._makeOne()
response.setHeader('Content-Type', 'text/html')
response.setHeader('Content-Length', len(HTML))
response.body = HTML
response.insertBase()
self.assertEqual(response.body, HTML)
self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
def test_insertBase_HTML_w_base_w_head_munged(self):
HTML = '<html><head></head><body></body></html>'
MUNGED = ('<html><head>\n'
'<base href="http://example.com/base/" />\n'
'</head><body></body></html>')
response = self._makeOne()
response.setHeader('Content-Type', 'text/html')
response.setHeader('Content-Length', 8)
response.body = HTML
response.setBase('http://example.com/base/')
response.insertBase()
self.assertEqual(response.body, MUNGED)
self.assertEqual(response.getHeader('Content-Length'),
str(len(MUNGED)))
def test_setBody_w_locking(self):
response = self._makeOne()
response.setBody('BEFORE', lock=True)
result = response.setBody('AFTER')
self.failIf(result)
self.assertEqual(response.body, 'BEFORE')
def test_setBody_empty_unchanged(self):
response = self._makeOne()
response.body = 'BEFORE'
result = response.setBody('')
self.failUnless(result)
self.assertEqual(response.body, 'BEFORE')
self.assertEqual(response.getHeader('Content-Type'), None)
self.assertEqual(response.getHeader('Content-Length'), None)
def test_setBody_2_tuple_wo_is_error_converted_to_HTML(self):
EXPECTED = ("<html>\n"
"<head>\n<title>TITLE</title>\n</head>\n"
"<body>\nBODY\n</body>\n"
"</html>\n")
response = self._makeOne()
response.body = 'BEFORE'
result = response.setBody(('TITLE', 'BODY'))
self.failUnless(result)
self.assertEqual(response.body, EXPECTED)
self.assertEqual(response.getHeader('Content-Type'),
'text/html; charset=iso-8859-15')
self.assertEqual(response.getHeader('Content-Length'),
str(len(EXPECTED)))
def test_setBody_2_tuple_w_is_error_converted_to_Site_Error(self):
response = self._makeOne()
response.body = 'BEFORE'
result = response.setBody(('TITLE', 'BODY'), is_error=True)
self.failUnless(result)
self.failIf('BEFORE' in response.body)
self.failUnless('<h2>Site Error</h2>' in response.body)
self.failUnless('TITLE' in response.body)
self.failUnless('BODY' in response.body)
self.assertEqual(response.getHeader('Content-Type'),
'text/html; charset=iso-8859-15')
def test_setBody_string_not_HTML(self):
response = self._makeOne()
result = response.setBody('BODY')
self.failUnless(result)
self.assertEqual(response.body, 'BODY')
self.assertEqual(response.getHeader('Content-Type'),
'text/plain; charset=iso-8859-15')
self.assertEqual(response.getHeader('Content-Length'), '4')
def test_setBody_string_HTML(self):
HTML = '<html><head></head><body></body></html>'
response = self._makeOne()
result = response.setBody(HTML)
self.failUnless(result)
self.assertEqual(response.body, HTML)
self.assertEqual(response.getHeader('Content-Type'),
'text/html; charset=iso-8859-15')
self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
def test_setBody_object_with_asHTML(self):
HTML = '<html><head></head><body></body></html>'
class Dummy:
def asHTML(self):
return HTML
response = self._makeOne()
result = response.setBody(Dummy())
self.failUnless(result)
self.assertEqual(response.body, HTML)
self.assertEqual(response.getHeader('Content-Type'),
'text/html; charset=iso-8859-15')
self.assertEqual(response.getHeader('Content-Length'), str(len(HTML)))
def test_setBody_object_with_unicode(self):
HTML = u'<html><head></head><body><h1>Tr\u0039s Bien</h1></body></html>'
ENCODED = HTML.encode('iso-8859-15')
response = self._makeOne()
result = response.setBody(HTML)
self.failUnless(result)
self.assertEqual(response.body, ENCODED)
self.assertEqual(response.getHeader('Content-Type'),
'text/html; charset=iso-8859-15')
self.assertEqual(response.getHeader('Content-Length'),
str(len(ENCODED)))
def test_setBody_w_bogus_pseudo_HTML(self):
# The 2001 checkin message which added the path-under-test says:
# (r19315): "merged content type on error fixes from 2.3
# If the str of the object returs a Python "pointer" looking mess,
# don't let it get treated as HTML.
from ZPublisher import NotFound
BOGUS = '<Bogus a39d53d>'
response = self._makeOne()
self.assertRaises(NotFound, response.setBody, BOGUS)
def test_setBody_html_no_charset_escapes_latin1_gt_lt(self):
response = self._makeOne()
BEFORE = ('<html><head></head><body><p>LT: \213</p>'
'<p>GT: \233</p></body></html>')
AFTER = ('<html><head></head><body><p>LT: &lt;</p>'
'<p>GT: &gt;</p></body></html>')
response.setHeader('Content-Type', 'text/html')
result = response.setBody(BEFORE)
self.failUnless(result)
self.assertEqual(response.body, AFTER)
self.assertEqual(response.getHeader('Content-Length'), str(len(AFTER)))
def test_setBody_latin_alias_escapes_latin1_gt_lt(self):
response = self._makeOne()
BEFORE = ('<html><head></head><body><p>LT: \213</p>'
'<p>GT: \233</p></body></html>')
AFTER = ('<html><head></head><body><p>LT: &lt;</p>'
'<p>GT: &gt;</p></body></html>')
response.setHeader('Content-Type', 'text/html; charset=latin1')
result = response.setBody(BEFORE)
self.failUnless(result)
self.assertEqual(response.body, AFTER)
self.assertEqual(response.getHeader('Content-Length'), str(len(AFTER)))
def test_setBody_calls_insertBase(self):
response = self._makeOne()
lamb = {}
def _insertBase():
lamb['flavor'] = 'CURRY'
response.insertBase = _insertBase
response.setBody('Garlic Naan')
self.assertEqual(lamb['flavor'], 'CURRY')
#def test_setBody_w_HTTP_content_compression(self):
def test_setBody_compression_uncompressible_mimetype(self):
BEFORE = 'foo' * 100 # body must get smaller on compression
response = self._makeOne()
response.setHeader('Content-Type', 'image/jpeg')
response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setBody(BEFORE)
self.failIf(response.getHeader('Content-Encoding'))
self.assertEqual(response.body, BEFORE)
def test_setBody_compression_existing_encoding(self):
BEFORE = 'foo' * 100 # body must get smaller on compression
response = self._makeOne()
response.setHeader('Content-Encoding', 'piglatin')
response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setBody(BEFORE)
self.assertEqual(response.getHeader('Content-Encoding'), 'piglatin')
self.assertEqual(response.body, BEFORE)
def test_setBody_compression_too_short_to_gzip(self):
BEFORE = 'foo' # body must get smaller on compression
response = self._makeOne()
response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setBody(BEFORE)
self.failIf(response.getHeader('Content-Encoding'))
self.assertEqual(response.body, BEFORE)
def test_setBody_compression_no_prior_vary_header(self):
# Vary header should be added here
response = self._makeOne()
response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setBody('foo' * 100) # body must get smaller on compression
self.failUnless('Accept-Encoding' in response.getHeader('Vary'))
def test_setBody_compression_vary(self):
def test_setBody_compression_w_prior_vary_header_wo_encoding(self):
# Vary header should be added here
response = self._makeOne()
response.enableHTTPCompression(REQUEST={'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setBody('foo'*100) # body must get smaller on compression
self.assertEqual('Accept-Encoding' in response.getHeader('Vary'), True)
# But here it would be unnecessary
response.setHeader('Vary', 'Cookie')
response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setBody('foo' * 100) # body must get smaller on compression
self.failUnless('Accept-Encoding' in response.getHeader('Vary'))
def test_setBody_compression_w_prior_vary_header_incl_encoding(self):
# Vary header already had Accept-Ecoding', do'nt munge
PRIOR = 'Accept-Encoding,Accept-Language'
response = self._makeOne()
response.enableHTTPCompression(REQUEST={'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setHeader('Vary', 'Accept-Encoding,Accept-Language')
before = response.getHeader('Vary')
response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'})
response.setHeader('Vary', PRIOR)
response.setBody('foo'*100)
self.assertEqual(before, response.getHeader('Vary'))
self.assertEqual(response.getHeader('Vary'), PRIOR)
def test_setBody_compression_no_prior_vary_header_but_forced(self):
# Compression forced, don't add a Vary entry for compression.
response = self._makeOne()
response.enableHTTPCompression({'HTTP_ACCEPT_ENCODING': 'gzip'},
force=True)
response.setBody('foo' * 100) # body must get smaller on compression
self.assertEqual(response.getHeader('Vary'), None)
def test_redirect_defaults(self):
URL = 'http://example.com'
response = self._makeOne()
result = response.redirect(URL)
self.assertEqual(result, URL)
self.assertEqual(response.status, 302)
self.assertEqual(response.getHeader('Location'), URL)
self.failIf(response._locked_status)
def test_redirect_explicit_status(self):
URL = 'http://example.com'
response = self._makeOne()
result = response.redirect(URL, status=307)
self.assertEqual(response.status, 307)
self.assertEqual(response.getHeader('Location'), URL)
self.failIf(response._locked_status)
def test_redirect_w_lock(self):
URL = 'http://example.com'
response = self._makeOne()
result = response.redirect(URL, lock=True)
self.assertEqual(response.status, 302)
self.assertEqual(response.getHeader('Location'), URL)
self.failUnless(response._locked_status)
def test__encode_unicode_no_content_type_uses_default_encoding(self):
self._setDefaultEncoding('UTF8')
UNICODE = u'<h1>Tr\u0039s Bien</h1>'
response = self._makeOne()
self.assertEqual(response._encode_unicode(UNICODE),
UNICODE.encode('UTF8'))
def test__encode_unicode_w_content_type_no_charset_updates_charset(self):
self._setDefaultEncoding('UTF8')
UNICODE = u'<h1>Tr\u0039s Bien</h1>'
response = self._makeOne()
response.setHeader('Content-Type', 'text/html')
self.assertEqual(response._encode_unicode(UNICODE),
UNICODE.encode('UTF8'))
response.getHeader('Content-Type', 'text/html; charset=UTF8')
def test__encode_unicode_w_content_type_w_charset(self):
self._setDefaultEncoding('UTF8')
UNICODE = u'<h1>Tr\u0039s Bien</h1>'
response = self._makeOne()
response.setHeader('Content-Type', 'text/html; charset=latin1')
self.assertEqual(response._encode_unicode(UNICODE),
UNICODE.encode('latin1'))
response.getHeader('Content-Type', 'text/html; charset=latin1')
def test__encode_unicode_w_content_type_w_charset_xml_preamble(self):
self._setDefaultEncoding('UTF8')
PREAMBLE = u'<?xml version="1.0" ?>'
ELEMENT = u'<element>Tr\u0039s Bien</element>'
UNICODE = u'\n'.join([PREAMBLE, ELEMENT])
response = self._makeOne()
response.setHeader('Content-Type', 'text/html; charset=latin1')
self.assertEqual(response._encode_unicode(UNICODE),
'<?xml version="1.0" encoding="latin1" ?>\n'
+ ELEMENT.encode('latin1'))
response.getHeader('Content-Type', 'text/html; charset=latin1')
def test_quoteHTML(self):
BEFORE = '<p>This is a story about a boy named "Sue"</p>'
AFTER = ('&lt;p&gt;This is a story about a boy named '
'&quot;Sue&quot;&lt;/p&gt;')
response = self._makeOne()
self.assertEqual(response.quoteHTML(BEFORE), AFTER)
def test_notFoundError(self):
from ZPublisher import NotFound
response = self._makeOne()
try:
response.notFoundError()
except NotFound, raised:
self.assertEqual(response.status, 404)
self.failUnless("<p><b>Resource:</b> Unknown</p>" in str(raised))
else:
self.fail("Didn't raise NotFound")
def test_notFoundError_w_entry(self):
from ZPublisher import NotFound
response = self._makeOne()
try:
response.notFoundError('ENTRY')
except NotFound, raised:
self.assertEqual(response.status, 404)
self.failUnless("<p><b>Resource:</b> ENTRY</p>" in str(raised))
else:
self.fail("Didn't raise NotFound")
def test_forbiddenError(self):
from ZPublisher import NotFound
response = self._makeOne()
try:
response.forbiddenError()
except NotFound, raised:
self.assertEqual(response.status, 404)
self.failUnless("<p><b>Resource:</b> Unknown</p>" in str(raised))
else:
self.fail("Didn't raise NotFound")
def test_forbiddenError_w_entry(self):
from ZPublisher import NotFound
response = self._makeOne()
try:
response.forbiddenError('ENTRY')
except NotFound, raised:
self.assertEqual(response.status, 404)
self.failUnless("<p><b>Resource:</b> ENTRY</p>" in str(raised))
else:
self.fail("Didn't raise NotFound")
def test_debugError(self):
from ZPublisher import NotFound
response = self._makeOne()
try:
response.debugError('testing')
except NotFound, raised:
self.assertEqual(response.status, 200)
self.failUnless("Zope has encountered a problem publishing "
"your object.<p>\ntesting</p>" in str(raised))
else:
self.fail("Didn't raise NotFound")
def test_badRequestError_valid_parameter_name(self):
from ZPublisher import BadRequest
response = self._makeOne()
try:
response.badRequestError('some_parameter')
except BadRequest, raised:
self.assertEqual(response.status, 400)
self.failUnless("The parameter, <em>some_parameter</em>, "
"was omitted from the request." in str(raised))
else:
self.fail("Didn't raise BadRequest")
def test_badRequestError_invalid_parameter_name(self):
from ZPublisher import InternalError
response = self._makeOne()
try:
response.badRequestError('URL1')
except InternalError, raised:
self.assertEqual(response.status, 400)
self.failUnless("Sorry, an internal error occurred in this "
"resource." in str(raised))
else:
self.fail("Didn't raise InternalError")
def test__unauthorized_no_realm(self):
response = self._makeOne()
response.realm = ''
response._unauthorized()
self.failIf('WWW-Authenticate' in response.headers)
def test__unauthorized_w_default_realm(self):
response = self._makeOne()
response._unauthorized()
self.failUnless('WWW-Authenticate' in response.headers) #literal
self.assertEqual(response.headers['WWW-Authenticate'],
'basic realm="Zope"')
def test__unauthorized_w_realm(self):
response = self._makeOne()
response.realm = 'Folly'
response._unauthorized()
self.failUnless('WWW-Authenticate' in response.headers) #literal
self.assertEqual(response.headers['WWW-Authenticate'],
'basic realm="Folly"')
def test_unauthorized_no_debug_mode(self):
from zExceptions import Unauthorized
response = self._makeOne()
try:
response.unauthorized()
except Unauthorized, raised:
self.assertEqual(response.status, 200) # publisher sets 401 later
self.failUnless("<strong>You are not authorized "
"to access this resource.</strong>" in str(raised))
else:
self.fail("Didn't raise Unauthorized")
def test_unauthorized_w_debug_mode_no_credentials(self):
from zExceptions import Unauthorized
response = self._makeOne()
response.debug_mode = True
try:
response.unauthorized()
except Unauthorized, raised:
self.failUnless("<p>\nNo Authorization header found.</p>"
in str(raised))
else:
self.fail("Didn't raise Unauthorized")
def test_unauthorized_w_debug_mode_w_credentials(self):
from zExceptions import Unauthorized
response = self._makeOne()
response.debug_mode = True
response._auth = 'bogus'
try:
response.unauthorized()
except Unauthorized, raised:
self.failUnless("<p>\nUsername and password are not correct.</p>"
in str(raised))
else:
self.fail("Didn't raise Unauthorized")
def test___str__already_wrote(self):
response = self._makeOne()
response._wrote = True
self.assertEqual(str(response), '')
def test___str__empty(self):
response = self._makeOne()
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 5)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], '')
self.assertEqual(lines[4], '')
def test___str__existing_content_length(self):
# The application can break clients by setting a bogus length; we
# don't do anything to stop that.
response = self._makeOne()
response.setHeader('Content-Length', 42)
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 5)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 42')
self.assertEqual(lines[3], '')
self.assertEqual(lines[4], '')
def test___str__existing_transfer_encoding(self):
# If 'Transfer-Encoding' is set, don't force 'Content-Length'.
response = self._makeOne()
response.setHeader('Transfer-Encoding', 'slurry')
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 5)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Transfer-Encoding: slurry')
self.assertEqual(lines[3], '')
self.assertEqual(lines[4], '')
def test___str__after_setHeader(self):
response = self._makeOne()
response.setHeader('x-consistency', 'Foolish')
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 6)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], 'X-Consistency: Foolish')
self.assertEqual(lines[4], '')
self.assertEqual(lines[5], '')
def test___str__after_setHeader_literal(self):
response = self._makeOne()
response.setHeader('X-consistency', 'Foolish', literal=True)
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 6)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], 'X-consistency: Foolish')
self.assertEqual(lines[4], '')
self.assertEqual(lines[5], '')
def test___str__after_redirect(self):
response = self._makeOne()
response.redirect('http://example.com/')
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 6)
self.assertEqual(lines[0], 'Status: 302 Moved Temporarily')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], 'Location: http://example.com/')
self.assertEqual(lines[4], '')
self.assertEqual(lines[5], '')
def test___str__after_setCookie_appendCookie(self):
response = self._makeOne()
response.setCookie('foo', 'bar', path='/')
response.appendCookie('foo', 'baz')
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 6)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], 'Set-Cookie: foo="bar%3Abaz"; '
'Path=/')
self.assertEqual(lines[4], '')
self.assertEqual(lines[5], '')
def test___str__after_expireCookie(self):
response = self._makeOne()
response.expireCookie('qux', path='/')
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 6)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], 'Set-Cookie: qux="deleted"; '
'Path=/; '
'Expires=Wed, 31-Dec-97 23:59:59 GMT; '
'Max-Age=0')
self.assertEqual(lines[4], '')
self.assertEqual(lines[5], '')
def test___str__after_addHeader(self):
response = self._makeOne()
response.addHeader('X-Consistency', 'Foolish')
response.addHeader('X-Consistency', 'Oatmeal')
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 7)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], 'X-Consistency: Foolish')
self.assertEqual(lines[4], 'X-Consistency: Oatmeal')
self.assertEqual(lines[5], '')
self.assertEqual(lines[6], '')
def test___str__w_body(self):
response = self._makeOne()
response.setBody('BLAH')
result = str(response)
lines = result.split('\r\n')
self.assertEqual(len(lines), 6)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 4')
self.assertEqual(lines[3],
'Content-Type: text/plain; charset=iso-8859-15')
self.assertEqual(lines[4], '')
self.assertEqual(lines[5], 'BLAH')
def test_write_already_wrote(self):
from StringIO import StringIO
stdout = StringIO()
response = self._makeOne(stdout=stdout)
response.write('Kilroy was here!')
self.failUnless(response._wrote)
lines = stdout.getvalue().split('\r\n')
self.assertEqual(len(lines), 5)
self.assertEqual(lines[0], 'Status: 200 OK')
self.assertEqual(lines[1], 'X-Powered-By: Zope (www.zope.org), '
'Python (www.python.org)')
self.assertEqual(lines[2], 'Content-Length: 0')
self.assertEqual(lines[3], '')
self.assertEqual(lines[4], 'Kilroy was here!')
def test_write_not_already_wrote(self):
from StringIO import StringIO
stdout = StringIO()
response = self._makeOne(stdout=stdout)
response._wrote = True
response.write('Kilroy was here!')
lines = stdout.getvalue().split('\r\n')
self.assertEqual(len(lines), 1)
self.assertEqual(lines[0], 'Kilroy was here!')
#TODO
# def test_exception_* WAAAAAA!
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(HTTPResponseTests, 'test'))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
......@@ -17,21 +17,30 @@ The HTTPResponse class takes care of server headers, response munging
and logging duties.
"""
import time, re, sys, tempfile
import asyncore
from cStringIO import StringIO
import re
import tempfile
import thread
import time
from zope.event import notify
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.Iterators import IStreamIterator
from ZPublisher.pubevents import PubBeforeStreaming
from medusa.http_date import build_http_date
from PubCore.ZEvent import Wakeup
from medusa.producers import hooked_producer
from medusa import http_server
import asyncore
from Producers import ShutdownProducer, LoggingProducer, CallbackProducer, \
file_part_producer, file_close_producer, iterator_producer
import DebugLogger
from ZPublisher.HTTPResponse import HTTPResponse # XXX WTF?
from ZPublisher.Iterators import IStreamIterator # XXX WTF?
from ZPublisher.pubevents import PubBeforeStreaming # XXX WTF?
from ZServer.medusa.http_date import build_http_date
from ZServer.PubCore.ZEvent import Wakeup
from ZServer.medusa import http_server
from ZServer.Producers import ShutdownProducer
from ZServer.Producers import LoggingProducer
from ZServer.Producers import CallbackProducer
from ZServer.Producers import file_part_producer
from ZServer.Producers import file_close_producer
from ZServer.Producers import iterator_producer
from ZServer.DebugLogger import log
class ZServerHTTPResponse(HTTPResponse):
......@@ -39,38 +48,35 @@ class ZServerHTTPResponse(HTTPResponse):
# Set this value to 1 if streaming output in
# HTTP/1.1 should use chunked encoding
http_chunk=1
http_chunk_size=1024
http_chunk = 1
http_chunk_size = 1024
# defaults
_http_version='1.0'
_http_connection='close'
_server_version='Zope/2.0 ZServer/2.0'
_http_version = '1.0'
_http_connection = 'close'
_server_version = 'Zope/2.0 ZServer/2.0'
# using streaming response
_streaming=0
_streaming = 0
# using chunking transfer-encoding
_chunking=0
_chunking = 0
_bodyproducer = None
def __str__(self,
html_search=re.compile('<html>',re.I).search,
):
def __str__(self):
if self._wrote:
if self._chunking:
return '0\r\n\r\n'
else:
return ''
headers=self.headers
body=self.body
headers = self.headers
body = self.body
# set 204 (no content) status if 200 and response is empty
# and not streaming
if not headers.has_key('content-type') and \
not headers.has_key('content-length') and \
not self._streaming and \
self.status == 200:
if ('content-type' not in headers and
'content-length' not in headers and
not self._streaming and self.status == 200):
self.setStatus('nocontent')
if self.status in (100, 101, 102, 204, 304):
......@@ -84,65 +90,65 @@ class ZServerHTTPResponse(HTTPResponse):
elif not headers.has_key('content-length') and not self._streaming:
self.setHeader('content-length', len(body))
headersl=[]
append=headersl.append
chunks = []
append = chunks.append
status=headers.get('status', '200 OK')
# status header must come first.
append("HTTP/%s %s" % (self._http_version or '1.0' , status))
if headers.has_key('status'):
del headers['status']
append("HTTP/%s %d %s" % (self._http_version or '1.0',
self.status, self.errmsg))
# add zserver headers
append('Server: %s' % self._server_version)
append('Date: %s' % build_http_date(time.time()))
if self._http_version=='1.0':
if self._http_connection=='keep-alive':
if self._http_version == '1.0':
if self._http_connection == 'keep-alive':
self.setHeader('Connection','Keep-Alive')
else:
self.setHeader('Connection','close')
# Close the connection if we have been asked to.
# Use chunking if streaming output.
if self._http_version=='1.1':
if self._http_connection=='close':
if self._http_version == '1.1':
if self._http_connection == 'close':
self.setHeader('Connection','close')
elif (not self.headers.has_key('content-length') and
self.http_chunk and self._streaming):
self.setHeader('Transfer-Encoding','chunked')
self._chunking=1
self._chunking = 1
headers = headers.items()
for line in self.accumulated_headers.splitlines():
for line in self.accumulated_headers:
if line[0] == '\t':
headers[-1][1] += '\n' + line
continue
headers.append(line.split(': ', 1))
for key, val in headers:
if key.lower()==key:
if key.lower() == key:
# only change non-literal header names
key="%s%s" % (key[:1].upper(), key[1:])
start=0
l=key.find('-',start)
key = "%s%s" % (key[:1].upper(), key[1:])
start = 0
l = key.find('-',start)
while l >= start:
key="%s-%s%s" % (key[:l],key[l+1:l+2].upper(),key[l+2:])
start=l+1
l=key.find('-',start)
key = "%s-%s%s" % (key[:l],
key[l+1:l+2].upper(),
key[l+2:])
start = l + 1
l = key.find('-', start)
val = val.replace('\n\t', '\r\n\t')
append("%s: %s" % (key, val))
if self.cookies:
headersl.extend(self._cookie_list())
chunks.extend(self._cookie_list())
append('')
append(body)
return "\r\n".join(headersl)
return "\r\n".join(chunks)
_tempfile=None
_templock=None
_tempstart=0
_tempfile = None
_templock = None
_tempstart = 0
def write(self,data):
"""\
......@@ -164,45 +170,44 @@ class ZServerHTTPResponse(HTTPResponse):
if type(data) != type(''):
raise TypeError('Value must be a string')
stdout=self.stdout
stdout = self.stdout
if not self._wrote:
notify(PubBeforeStreaming(self))
l=self.headers.get('content-length', None)
l = self.headers.get('content-length', None)
if l is not None:
try:
if type(l) is type(''): l=int(l)
if type(l) is type(''): l = int(l)
if l > 128000:
self._tempfile=tempfile.TemporaryFile()
self._templock=thread.allocate_lock()
self._tempfile = tempfile.TemporaryFile()
self._templock = thread.allocate_lock()
except: pass
self._streaming=1
self._streaming = 1
stdout.write(str(self))
self._wrote=1
self._wrote = 1
if not data: return
if self._chunking:
data = '%x\r\n%s\r\n' % (len(data),data)
l=len(data)
l = len(data)
t=self._tempfile
t = self._tempfile
if t is None or l<200:
stdout.write(data)
else:
b=self._tempstart
e=b+l
b = self._tempstart
e = b + l
self._templock.acquire()
try:
t.seek(b)
t.write(data)
finally:
self._templock.release()
self._tempstart=e
self._tempstart = e
stdout.write(file_part_producer(t,self._templock,b,e), l)
_retried_response = None
......@@ -214,18 +219,18 @@ class ZServerHTTPResponse(HTTPResponse):
finally:
self._retried_response = None
return
stdout=self.stdout
stdout = self.stdout
t=self._tempfile
t = self._tempfile
if t is not None:
stdout.write(file_close_producer(t), 0)
self._tempfile=None
self._tempfile = None
stdout.finish(self)
stdout.close()
self.stdout=None # need to break cycle?
self._request=None
self.stdout = None # need to break cycle?
self._request = None
def retry(self):
"""Return a request object to be used in a retry attempt
......@@ -234,11 +239,11 @@ class ZServerHTTPResponse(HTTPResponse):
# only stdout stderr were passed to the constructor. OTOH, I
# think that that's all that is ever passed.
response=self.__class__(stdout=self.stdout, stderr=self.stderr)
response.headers=self.headers
response._http_version=self._http_version
response._http_connection=self._http_connection
response._server_version=self._server_version
response = self.__class__(stdout=self.stdout, stderr=self.stderr)
response.headers = self.headers
response._http_version = self._http_version
response._http_connection = self._http_connection
response._server_version = self._server_version
self._retried_response = response
return response
......@@ -272,28 +277,28 @@ class ChannelPipe:
restrict access to channel to the push method only."""
def __init__(self, request):
self._channel=request.channel
self._request=request
self._shutdown=0
self._close=0
self._bytes=0
self._channel = request.channel
self._request = request
self._shutdown = 0
self._close = 0
self._bytes = 0
def write(self, text, l=None):
if self._channel.closed:
return
if l is None: l=len(text)
self._bytes=self._bytes + l
if l is None: l = len(text)
self._bytes = self._bytes + l
self._channel.push(text,0)
Wakeup()
def close(self):
DebugLogger.log('A', id(self._request),
log('A', id(self._request),
'%s %s' % (self._request.reply_code, self._bytes))
if not self._channel.closed:
self._channel.push(LoggingProducer(self._request, self._bytes), 0)
self._channel.push(CallbackProducer(self._channel.done), 0)
self._channel.push(CallbackProducer(
lambda t=('E', id(self._request)): apply(DebugLogger.log, t)), 0)
lambda t=('E', id(self._request)): apply(log, t)), 0)
if self._shutdown:
self._channel.push(ShutdownProducer(), 0)
Wakeup()
......@@ -304,15 +309,15 @@ class ChannelPipe:
# channel closed too soon
self._request.log(self._bytes)
DebugLogger.log('E', id(self._request))
log('E', id(self._request))
if self._shutdown:
Wakeup(lambda: asyncore.close_all())
else:
Wakeup()
self._channel=None #need to break cycles?
self._request=None
self._channel = None #need to break cycles?
self._request = None
def flush(self): pass # yeah, whatever
......@@ -321,8 +326,8 @@ class ChannelPipe:
self._shutdown = 1
if response.headers.get('connection','') == 'close' or \
response.headers.get('Connection','') == 'close':
self._close=1
self._request.reply_code=response.status
self._close = 1
self._request.reply_code = response.status
def start_response(self, status, headers, exc_info=None):
# Used for WSGI
......@@ -342,9 +347,10 @@ def make_response(request, headers):
"Simple http response factory"
# should this be integrated into the HTTPResponse constructor?
response=ZServerHTTPResponse(stdout=ChannelPipe(request), stderr=StringIO())
response._http_version=request.version
if request.version=='1.0' and is_proxying_match(request.request):
response = ZServerHTTPResponse(stdout=ChannelPipe(request),
stderr=StringIO())
response._http_version = request.version
if request.version == '1.0' and is_proxying_match(request.request):
# a request that was made as if this zope was an http 1.0 proxy.
# that means we have to use some slightly different http
# headers to manage persistent connections.
......@@ -354,5 +360,5 @@ def make_response(request, headers):
connection_re = http_server.CONNECTION
response._http_connection = http_server.get_header(connection_re,
request.header).lower()
response._server_version=request.channel.server.SERVER_IDENT
response._server_version = request.channel.server.SERVER_IDENT
return response
......@@ -111,7 +111,9 @@ class ZServerHTTPResponseTestCase(unittest.TestCase):
'Title-Cased': 'bar',
'mixed-CasED': 'spam',
'multilined': 'eggs\n\tham'}
response.accumulated_headers = 'foo-bar: bar\n\tbaz\nFoo-bar: monty\n'
response.accumulated_headers = ['foo-bar: bar',
'\tbaz',
'Foo-bar: monty']
response.cookies = dict(foo=dict(value='bar'))
response.body = 'A body\nwith multiple lines\n'
......
from Zope2.Startup.run import configure
from Zope2 import startup
configure('<<INSTANCE_HOME>>/etc/zope.conf')
startup()
# mod_wsgi looks for the special name 'application'.
from ZPublisher.WSGIPublisher import publish_module as application
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment