Commit fb57726c authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki

ERP5Type/patches: use the first entry of HTTP_X_FORWARDED_FOR as the source IP address.

This functionality is enabled only if 'trusted-proxies' configuration
value is ('0.0.0.0',) by putting the following in zope.conf :

trusted-proxy 0.0.0.0
parent 1fba1fb6
# -*- coding: utf-8 -*-
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPRequest import HTTPRequest, sane_environment
import ZPublisher.HTTPRequest
import sys
import weakref
import thread
HTTPRequest__init__ = HTTPRequest.__init__
def __init__(self, *args, **kw):
HTTPRequest__init__(self, *args, **kw)
def __init__(self, stdin, environ, response, clean=0):
if ZPublisher.HTTPRequest.trusted_proxies == ('0.0.0.0',): # Magic value to enable this functionality
# Frontend-facing proxy is responsible for sanitising
# HTTP_X_FORWARDED_FOR, and only trusted accesses should bypass
# that proxy. So trust first entry.
forwarded_for = (
sane_environment(environ)
if not clean else
environ
).get('HTTP_X_FORWARDED_FOR', '').split(',', 1)[0].strip()
else:
forwarded_for = None
HTTPRequest__init__(self, stdin=stdin, environ=environ, response=response, clean=clean)
import erp5.component
erp5.component.ref_manager.add_request(self)
if forwarded_for:
self._client_addr = forwarded_for
HTTPRequest.__init__ = __init__
......@@ -21,23 +21,23 @@
from ZServer.medusa.http_server import http_request
from ZPublisher.HTTPRequest import trusted_proxies
import ZPublisher.HTTPRequest
import string
import base64
import time
from urllib import quote
def log (self, bytes):
# The purpose of this patch is to emit original IP addresses in Z2.log
# even when a reverse proxy is used. A similar thing is implemented
# in the ZPublisher, but not in the ZServer.
addr = self.channel.addr[0]
# Frontend-facing proxy is responsible for sanitising
# HTTP_X_FORWARDED_FOR, and only trusted accesses should bypass
# that proxy. So trust first entry.
#
# <patch>
addr = self.channel.addr[0]
if trusted_proxies and addr in trusted_proxies:
original_addr = self.get_header('x-forwarded-for')
if original_addr:
addr = original_addr.split(', ')[0]
if ZPublisher.HTTPRequest.trusted_proxies == ('0.0.0.0',): # Magic value to enable this functionality
forwarded_for = (self.get_header('x-forwarded-for') or '').split(',', 1)[0].strip()
if forwarded_for:
addr = forwarded_for
# </patch>
user_agent=self.get_header('user-agent')
......
......@@ -33,6 +33,8 @@ except ImportError: # BBB: ZODB < 4
import unittest
import sys
import mock
import os
import requests
import transaction
from random import randint
......@@ -53,6 +55,7 @@ from AccessControl.ZopeGuards import guarded_getattr, guarded_hasattr
from Products.ERP5Type.tests.utils import createZODBPythonScript
from Products.ERP5Type.tests.utils import removeZODBPythonScript
from Products.ERP5Type import Permissions
from Products.ERP5Type.tests.runUnitTest import log_directory
class PropertySheetTestCase(ERP5TypeTestCase):
"""Base test case class for property sheets tests.
......@@ -3317,6 +3320,79 @@ return [
'<Organisation at /%s/organisation_module/organisation_id>' % self.portal.getId(),
repr(document))
def test_request_with_x_forwarded_for(self):
script_container = self.portal.portal_skins.custom
script_id = 'ERP5Site_getClientAddr'
createZODBPythonScript(script_container, script_id, '', 'return context.REQUEST.getClientAddr()')
self.commit()
z2_log_path = os.path.join(log_directory, 'Z2.log')
import ZPublisher.HTTPRequest
# test without configuration
ZPublisher.HTTPRequest.trusted_proxies = []
response = requests.get(
'%s/%s' % (self.portal.absolute_url(), script_id),
headers={'X-Forwarded-For': '1.2.3.4'},
)
self.assertNotEqual(response.text, '1.2.3.4')
f = open(z2_log_path, 'rb')
f.seek(-256, os.SEEK_END) # Assumes last line is not longer than 256 chars (it should be about 130)
last_line = f.readlines()[-1]
f.close()
self.assertFalse(last_line.startswith('1.2.3.4 - '), last_line)
response = requests.get(
'%s/%s' % (self.portal.absolute_url(), script_id),
headers={'X-Forwarded-For': '1.2.3.4, 5.6.7.8'},
)
self.assertNotEqual(response.text, '1.2.3.4')
self.assertNotEqual(response.text, '5.6.7.8')
f = open(z2_log_path, 'rb')
f.seek(-256, os.SEEK_END)
last_line = f.readlines()[-1]
f.close()
self.assertFalse(last_line.startswith('1.2.3.4 - '), last_line)
self.assertFalse(last_line.startswith('5.6.7.8 - '), last_line)
response = requests.get(
'%s/%s' % (self.portal.absolute_url(), script_id),
)
self.assertNotEqual(response.text, '1.2.3.4')
f = open(z2_log_path, 'rb')
f.seek(-256, os.SEEK_END)
last_line = f.readlines()[-1]
f.close()
self.assertFalse(last_line.startswith('1.2.3.4 - '), last_line)
# test with configuration
ZPublisher.HTTPRequest.trusted_proxies = ('0.0.0.0',)
response = requests.get(
'%s/%s' % (self.portal.absolute_url(), script_id),
headers={'X-Forwarded-For': '1.2.3.4'},
)
self.assertEqual(response.text, '1.2.3.4')
f = open(z2_log_path, 'rb')
f.seek(-256, os.SEEK_END)
last_line = f.readlines()[-1]
f.close()
self.assertTrue(last_line.startswith('1.2.3.4 - '), last_line)
response = requests.get(
'%s/%s' % (self.portal.absolute_url(), script_id),
headers={'X-Forwarded-For': '1.2.3.4, 5.6.7.8'},
)
self.assertEqual(response.text, '1.2.3.4')
f = open(z2_log_path, 'rb')
f.seek(-256, os.SEEK_END)
last_line = f.readlines()[-1]
f.close()
self.assertTrue(last_line.startswith('1.2.3.4 - '), last_line)
response = requests.get(
'%s/%s' % (self.portal.absolute_url(), script_id),
)
self.assertNotEqual(response.text, '1.2.3.4')
f = open(z2_log_path, 'rb')
f.seek(-256, os.SEEK_END)
last_line = f.readlines()[-1]
f.close()
self.assertFalse(last_line.startswith('1.2.3.4 - '), last_line)
class TestAccessControl(ERP5TypeTestCase):
# Isolate test in a dedicaced class in order not to break other tests
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment