Commit 7cc26c3a authored by Jérome Perrin's avatar Jérome Perrin

stack/erp5: rate limit clients on backend haproxy

This supports defining rate limits as maximum number of matched requests
over a period of time for the same client IP.

Requests are matched either by a regular expression on URL path or a
response status code.

Once a client exceeded the rate limit, haproxy will reply with HTTP
status code 429 ("Too Many Requests") until the expiration period
configured on the rule has passed. Optionally, the responses can be made
after the "tarpit" delay.
parent b929124a
Pipeline #40042 failed with stage
in 0 seconds
......@@ -760,6 +760,112 @@
}
},
"type": "object"
},
"rate-limits": {
"type": "object",
"title": "Request rate limiting, per source IP address",
"properties": {
"rules": {
"type": "array",
"items": {
"oneOf": [
{
"required": [
"url-path-pattern",
"max-requests",
"time-window"
]
},
{
"required": [
"status-code",
"max-requests",
"time-window"
]
}
],
"examples": [
{
"table-name": "limit_unauthorized",
"max-requests": 100,
"time-window": "10m",
"status-code": "401",
"action": "429",
"expire": "10m"
},
{
"table-name": "limit_login",
"max-requests": 10,
"time-window": "30s",
"url-path-pattern": "^/login.*",
"action": "tarpit",
"expire": "1m"
}
],
"properties": {
"max-requests": {
"type": "integer",
"description": "Maximum number of requests during the time window before triggering the action"
},
"time-window": {
"type": "string",
"description": "Time window, in milliseconds, seconds, minutes, hours or days",
"pattern": "^\\d+(ms|s|m|h|d)$"
},
"expire": {
"type": "string",
"description": "Expiration time, in milliseconds, seconds, minutes, hours or days",
"pattern": "^\\d+(ms|s|m|h|d)$"
},
"table-size": {
"type": "string",
"description": "Size of the haproxy stick table. Refer to haproxy `table-size` for details.",
"default": "1m",
"pattern": "^\\d+(k|m|g)$"
},
"table-name": {
"type": "string",
"description": "Name of the haproxy stick table. Useful for inspection and manipulation using API. If set, make sure two rules do not share the same stick table name.",
"pattern": "^[a-zA-Z0-9_-]+$"
},
"url-path-pattern": {
"type": "string",
"description": "If set, rule will only be applied for requests with path matching the regular expression.",
"examples": [
"^/login.*"
]
},
"status-code": {
"type": "string",
"description": "HTTP status codes to consider.",
"pattern": "[\\d :]",
"examples": [
"401 403 404",
"500",
"400:599"
]
},
"action": {
"description": "Action to take, immediately deny the request with HTTP status code 429 or after the `tarpit` delay",
"type": "string",
"default": "deny",
"enum": [
"deny",
"tarpit"
]
}
},
"additionalProperties": false
}
},
"tarpit-duration": {
"type": "string",
"description": "Delay to apply on rate limited responses using `tarpit` action.",
"default": "5s",
"pattern": "^\\d+(ms|s|m|h|d)$"
}
},
"additionalProperties": false
}
},
"type": "object"
......
import datetime
import functools
import ipaddress
import json
import logging
......@@ -9,9 +11,11 @@ import subprocess
import sqlite3
import tempfile
import time
import typing
import urllib.parse
from http.server import BaseHTTPRequestHandler
from unittest import mock
import http
from unittest import expectedFailure, mock
from cryptography import x509
from cryptography.hazmat.backends import default_backend
......@@ -1091,3 +1095,143 @@ class TestPathBasedRouting(BalancerTestCase):
# elements which share a common prefix.
assertRoutingEqual('second', '/next', prefix + '/erp5/web_site_module/the_next_website' + vhr + '/_vh_next')
assertRoutingEqual('second', '/next2', prefix + '/erp5/web_site_module/the_next2_website' + vhr + '/_vh_next2')
class StatusCodeHTTPServer(ManagedHTTPServer):
"""An HTTP Server which replies with the status code passed as path element,
for example, it would reply with 418 for the following requests:
GET /418
because balancer rewrites the URL, the actual URL used by this server is:
GET /VirtualHostBase/https/{host}/VirtualHostRoot/418
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
path_elements = [p for p in self.path.split('/') if p]
if path_elements[:1] == ['VirtualHostBase']:
path_elements = path_elements[4:]
if not path_elements:
path_elements = ['200']
status = int(path_elements[0])
self.send_response(status)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(http.HTTPStatus(status).phrase.encode() + b"\n")
def log_message(self, format: str, *args) -> None:
logging.getLogger(__name__ + '.StatusCodeHTTPServer').info(format, *args)
class TestRateLimiting(BalancerTestCase):
__partition_reference__ = 'rl'
@classmethod
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use our server with status code control instead
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("status_code_server", StatusCodeHTTPServer).netloc, 1, False]]
# and set some rate limiting rules:
parameter_dict["rate-limits"] = {
"tarpit-duration": "5s",
"rules": [
# one based on the path
{
"max-requests": 3,
"time-window": "20s",
"url-path-pattern": "/200/.*limited",
"action": "tarpit",
"expire": "20s"
},
# one based on the HTTP status code
{
"max-requests": 5,
"time-window": "10s",
"status-code": "400:599",
"table-name": "errors",
"expire": "10s"
},
],
}
# we'll connect to the backend with a certificate, so that the backend trusts our
# X-Forwarded-For header and that we can simulate multiple clients from different
# source IPs.
frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService)
certificate = cls.getManagedResource('client_certificate', CaucaseCertificate)
certificate.request('shared frontend', frontend_caucase)
parameter_dict['ssl']['frontend-caucase-url-list'] = [frontend_caucase.url]
return parameter_dict
def tearDown(self):
# restart haproxy between tests to reset the stick tables
with self.slap.instance_supervisor_rpc as supervisor:
info, = [i for i in
supervisor.getAllProcessInfo() if i['name'].startswith('haproxy-')]
haproxy_process_name = f"{info['group']}:{info['name']}"
supervisor.stopProcess(haproxy_process_name)
supervisor.startProcess(haproxy_process_name)
self.slap.waitForInstance()
def do_get(self, url_path:str, client_ip:typing.Union[str, None] = None) -> requests.Response:
default_balancer_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['url-backend-default']
client_certificate = self.getManagedResource('client_certificate', CaucaseCertificate)
headers = {}
cert = None
if client_ip:
headers['X-Forwarded-For'] = client_ip
cert = (client_certificate.cert_file, client_certificate.key_file)
return requests.get(default_balancer_url + url_path, verify=False, headers=headers, cert=cert)
def test_backend_rate_limiting_per_url(self) -> None:
for client_ip in ('1.2.3.4', '::1', None):
with self.subTest(client_ip):
for _ in range(3):
self.do_get('/200/rate_limited', client_ip).raise_for_status()
limited_request = self.do_get('/200/rate_limited', client_ip)
self.assertEqual(limited_request.status_code, requests.codes.too_many_requests)
self.assertGreater(limited_request.elapsed , datetime.timedelta(seconds=5))
self.do_get('/200/other_url', client_ip).raise_for_status()
def test_backend_rate_limiting_per_status_code(self) -> None:
for client_ip in ('1.2.3.4', '::1', None):
with self.subTest(client_ip):
self.assertEqual(self.do_get('/400', client_ip).status_code, 400)
self.assertEqual(self.do_get('/401', client_ip).status_code, 401)
self.assertEqual(self.do_get('/404', client_ip).status_code, 404)
# status codes 2* and 3* do not increase the counter
self.assertEqual(self.do_get('/200', client_ip).status_code, 200)
self.assertEqual(self.do_get('/302', client_ip).status_code, 302)
self.assertEqual(self.do_get('/500', client_ip).status_code, 500)
self.assertEqual(self.do_get('/500', client_ip).status_code, 500)
limited_request = self.do_get('/200', client_ip)
self.assertEqual(limited_request.status_code, requests.codes.too_many_requests)
self.do_get('/500', '4.5.6.7')
self.assertIn(
'key=4.5.6.7',
subprocess.check_output(
self.computer_partition_root_path / 'bin' / 'haproxy-socat-stats',
input='show table stick_table_errors\n',
text=True,
)
)
@expectedFailure
def test_status_code_only_track_matching_status_code(self):
self.do_get('/200', '1.2.3.4')
self.assertNotIn(
'key=1.2.3.4',
subprocess.check_output(
self.computer_partition_root_path / 'bin' / 'haproxy-socat-stats',
input='show table stick_table_errors\n',
text=True,
)
)
......@@ -94,11 +94,11 @@ md5sum = 9c580be982d8c63ec06fc273ef3cb971
[template-balancer]
filename = instance-balancer.cfg.in
md5sum = e9aa89754085bdc7a6fb9e53c0c97f9d
md5sum = 6cb40415c70f147c07cabad15b159629
[template-haproxy-cfg]
filename = haproxy.cfg.in
md5sum = 2cd76971b64b0bf7771978ad07bfc2e5
md5sum = 5f102a2144118cf16a8d149381710706
[template-rsyslogd-cfg]
filename = rsyslogd.cfg.in
......
......@@ -90,6 +90,20 @@
# ]
# },
#
# # rate limits
# "rate-limits": {
# "tarpit-duration": "10s",
# "rules": [
# {
# "max-requests": 3,
# "time-window": "20s",
# "url-path-pattern": "/200/.*limited",
# "action": "tarpit",
# "expire": "20s"
# },
# ],
# },
#
# # The mapping of zope paths.
# # This is a Zope specific feature used only to provide https while running
# # ERP5 "unit test" suite.
......@@ -151,6 +165,9 @@ global
stats socket {{ parameter_dict['stats-socket'] }} level admin
{% if len(parameter_dict['rate-limits']['rules']) > 3 %}
tune.stick-counters {{ len(parameter_dict['rate-limits']['rules']) }}
{% endif %}
defaults
mode http
......@@ -190,11 +207,15 @@ listen {{ name }}
bind {{ parameter_dict['ipv4'] }}:{{ frontend['port'] }} {{ bind_ssl_crt }} {{ ssl_auth }}
bind {{ parameter_dict['ipv6'] }}:{{ frontend['port'] }} {{ bind_ssl_crt }} {{ ssl_auth }}
acl acl_verified_frontend ssl_c_used ssl_c_verify 0
http-request set-var(txn.client_real_ip) req.hdr(X-Forwarded-For) if acl_verified_frontend
http-request set-var(txn.client_real_ip) src unless acl_verified_frontend
# remove X-Forwarded-For unless client presented a verified certificate
http-request del-header X-Forwarded-For unless { ssl_c_verify 0 } { ssl_c_used 1 }
http-request del-header X-Forwarded-For unless acl_verified_frontend
# set Remote-User if client presented a verified certificate
http-request del-header Remote-User
http-request set-header Remote-User %{+Q}[ssl_c_s_dn(cn)] if { ssl_c_verify 0 } { ssl_c_used 1 }
http-request set-header Remote-User %{+Q}[ssl_c_s_dn(cn)] if acl_verified_frontend
# reject invalid host header before using it in path
http-request deny deny_status 400 if { req.hdr(host) -m sub / }
......@@ -231,6 +252,29 @@ backend {{ name }}
timeout server {{ backend['timeout'] + 3 }}s
{%- endif %}
timeout tarpit {{ parameter_dict['rate-limits']['tarpit-duration'] }}
{%- for rule in parameter_dict['rate-limits']['rules'] %}
{%- if rule['url-path-pattern'] is defined %}
acl acl_rate_limit_{{ rule['table-name'] }}_url_match path_reg {{ rule['url-path-pattern'] }}
acl acl_rate_limit_{{ rule['table-name'] }}_url_rate_exceeded sc{{ loop.index0 }}_http_req_rate(stick_table_{{ rule['table-name'] }}) gt {{ rule['max-requests'] }}
{%- else %}
acl acl_rate_limit_{{ rule['table-name'] }}_url_match always_true
acl acl_rate_limit_{{ rule['table-name'] }}_url_rate_exceeded always_true
{%- endif %}
{%- if rule['status-code'] is defined %}
acl acl_rate_limit_{{ rule['table-name'] }}_status_code_hit status {{ rule['status-code'] }}
acl acl_rate_limit_{{ rule['table-name'] }}_status_code_exceeded sc{{ loop.index0 }}_gpc0_rate(stick_table_{{ rule['table-name'] }}) ge {{ rule['max-requests'] }}
http-response sc-inc-gpc0({{ loop.index0 }}) if acl_rate_limit_{{ rule['table-name'] }}_status_code_hit
{%- else %}
acl acl_rate_limit_{{ rule['table-name'] }}_status_code_exceeded always_true
{%- endif %}
http-request track-sc{{ loop.index0 }} var(txn.client_real_ip) table stick_table_{{ rule['table-name'] }} if acl_rate_limit_{{ rule['table-name'] }}_url_match
acl acl_rate_limit_{{ rule['table-name'] }}_url_rate_exceeded sc{{ loop.index0 }}_http_req_rate(stick_table_{{ rule['table-name'] }}) gt {{ rule['max-requests'] }}
http-request {{ rule['action'] }} deny_status 429 if acl_rate_limit_{{ rule['table-name'] }}_url_rate_exceeded acl_rate_limit_{{ rule['table-name'] }}_url_match acl_rate_limit_{{ rule['table-name'] }}_status_code_exceeded
{% endfor %}
{% set has_webdav = [] -%}
{% for address, connection_count, webdav in backend['backend-list'] -%}
{% if webdav %}{% do has_webdav.append(None) %}{% endif -%}
......@@ -266,3 +310,11 @@ backend backend_{{ group_name }}_{{ name }}
server {{ name }} {{ urllib_parse.urlparse(url).netloc }}
{%- endfor %}
{% endfor %}
{%- for rule in parameter_dict['rate-limits']['rules'] %}
backend stick_table_{{ rule['table-name'] }}
stick-table type string size {{ rule['table-size'] }} expire {{ rule['expire'] }} store
{%- if rule['url-path-pattern'] is defined %} http_req_rate({{ rule['time-window'] }}) {% endif %}
{%- if rule['status-code'] is defined %} gpc0_rate({{ rule['time-window'] }}) {% endif %}
{% endfor %}
......@@ -322,6 +322,15 @@ init =
options['frontend-dict'] = frontend_dict
options['zope-virtualhost-monster-backend-dict'] = zope_virtualhost_monster_backend_dict
rate_limits = slapparameter_dict.get('rate-limits', {})
rate_limits.setdefault('tarpit-duration', '5s')
rate_limits.setdefault('rules', [])
for idx, rule in enumerate(rate_limits['rules']):
rule.setdefault('table-name', str(idx))
rule.setdefault('table-size', '1m')
rule.setdefault('action', 'deny')
options['rate-limits'] = rate_limits
if port_dict != previous_port_dict:
with open(options['ports-state-file'] + '.tmp', 'w') as f:
json.dump(port_dict, f, indent=True)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment