From dc0e162219bffbf48e1612d021c45d5c30d5062d Mon Sep 17 00:00:00 2001 From: Pavel Dedik Date: Tue, 24 Mar 2026 14:21:36 +0100 Subject: [PATCH] Add PerRequestCacheThrottlingValidator --- example/apps/test_security/tests/__init__.py | 1 + .../apps/test_security/tests/throttling.py | 75 +++++++++++++++++++ security/throttling/validators.py | 26 +++++++ 3 files changed, 102 insertions(+) create mode 100644 example/apps/test_security/tests/throttling.py diff --git a/example/apps/test_security/tests/__init__.py b/example/apps/test_security/tests/__init__.py index 77fc389d..bc282558 100644 --- a/example/apps/test_security/tests/__init__.py +++ b/example/apps/test_security/tests/__init__.py @@ -4,4 +4,5 @@ from .input_request_log import InputRequestLogTestCase from .output_request_log import OutputRequestLogTestCase from .partitioned_log import PartitionedLogTestCase +from .throttling import PerRequestCacheThrottlingValidatorTestCase from .utils import UtilsTestCase diff --git a/example/apps/test_security/tests/throttling.py b/example/apps/test_security/tests/throttling.py new file mode 100644 index 00000000..40439a3c --- /dev/null +++ b/example/apps/test_security/tests/throttling.py @@ -0,0 +1,75 @@ +from django.core.cache import cache +from django.test import RequestFactory, TestCase, override_settings + +from germanium.tools import assert_false, assert_raises, assert_true + +from security.throttling.exception import ThrottlingException +from security.throttling.validators import PerRequestCacheThrottlingValidator + + +class PerRequestCacheThrottlingValidatorTestCase(TestCase): + + def setUp(self): + cache.clear() + self.factory = RequestFactory() + + def _make_request(self, path='/test/', method='GET', ip='127.0.0.1'): + request = self.factory.generic(method, path) + request.META['REMOTE_ADDR'] = ip + return request + + @override_settings(SECURITY_THROTTLING_ENABLED=True) + def test_requests_under_limit_should_pass(self): + validator = PerRequestCacheThrottlingValidator(60, 3) + request = self._make_request() + assert_true(validator._validate(request)) + assert_true(validator._validate(request)) + assert_true(validator._validate(request)) + + @override_settings(SECURITY_THROTTLING_ENABLED=True) + def test_requests_over_limit_should_fail(self): + validator = PerRequestCacheThrottlingValidator(60, 3) + request = self._make_request() + validator._validate(request) + validator._validate(request) + validator._validate(request) + assert_false(validator._validate(request)) + + @override_settings(SECURITY_THROTTLING_ENABLED=True) + def test_validate_raises_throttling_exception_when_over_limit(self): + validator = PerRequestCacheThrottlingValidator(60, 2) + request = self._make_request() + validator.validate(request) + validator.validate(request) + with assert_raises(ThrottlingException): + validator.validate(request) + + @override_settings(SECURITY_THROTTLING_ENABLED=True) + def test_different_ips_are_counted_independently(self): + validator = PerRequestCacheThrottlingValidator(60, 2) + request_a = self._make_request(ip='10.0.0.1') + request_b = self._make_request(ip='10.0.0.2') + validator._validate(request_a) + validator._validate(request_a) + # IP A is at limit + assert_false(validator._validate(request_a)) + # IP B is independent + assert_true(validator._validate(request_b)) + + @override_settings(SECURITY_THROTTLING_ENABLED=True) + def test_different_paths_are_counted_independently(self): + validator = PerRequestCacheThrottlingValidator(60, 2) + request_a = self._make_request(path='/path-a/') + request_b = self._make_request(path='/path-b/') + validator._validate(request_a) + validator._validate(request_a) + assert_false(validator._validate(request_a)) + assert_true(validator._validate(request_b)) + + @override_settings(SECURITY_THROTTLING_ENABLED=False) + def test_throttling_disabled_always_passes(self): + validator = PerRequestCacheThrottlingValidator(60, 1) + request = self._make_request() + validator.validate(request) + validator.validate(request) + validator.validate(request) diff --git a/security/throttling/validators.py b/security/throttling/validators.py index f72d656d..155e811f 100644 --- a/security/throttling/validators.py +++ b/security/throttling/validators.py @@ -1,5 +1,8 @@ +import hashlib +import time from datetime import timedelta +from django.core.cache import cache from django.utils import timezone from django.utils.translation import gettext_lazy as _ from django.urls import resolve, Resolver404 @@ -55,6 +58,29 @@ def _validate(self, request): ) < self.throttle_at +class PerRequestCacheThrottlingValidator(PerRequestThrottlingValidator): + """ + Throttling validator that counts requests using Django's cache backend. + Unlike PerRequestThrottlingValidator, this doesn't rely on async input request logs, + making throttling effective immediately. + """ + + def _get_cache_key(self, request): + ip = get_client_ip(request)[0] or '' + window = int(time.time() / self.timeframe) + raw = '{}:{}:{}:{}'.format(ip, request.method.upper(), request.path, window) + return 'throttle:req:{}'.format(hashlib.md5(raw.encode()).hexdigest()) + + def _validate(self, request): + key = self._get_cache_key(request) + try: + count = cache.incr(key) + except ValueError: + cache.set(key, 1, timeout=self.timeframe) + count = 1 + return count <= self.throttle_at + + class LoginThrottlingValidator(ThrottlingValidator): slug = None