Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/apps/test_security/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
from .input_request_log import InputRequestLogTestCase
from .output_request_log import OutputRequestLogTestCase
from .partitioned_log import PartitionedLogTestCase
from .throttling import PerRequestCacheThrottlingValidatorTestCase
from .utils import UtilsTestCase
75 changes: 75 additions & 0 deletions example/apps/test_security/tests/throttling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from django.core.cache import cache
from django.test import RequestFactory, TestCase, override_settings

from germanium.tools import assert_false, assert_raises, assert_true

from security.throttling.exception import ThrottlingException
from security.throttling.validators import PerRequestCacheThrottlingValidator


class PerRequestCacheThrottlingValidatorTestCase(TestCase):

def setUp(self):
cache.clear()
self.factory = RequestFactory()

def _make_request(self, path='/test/', method='GET', ip='127.0.0.1'):
request = self.factory.generic(method, path)
request.META['REMOTE_ADDR'] = ip
return request

@override_settings(SECURITY_THROTTLING_ENABLED=True)
def test_requests_under_limit_should_pass(self):
validator = PerRequestCacheThrottlingValidator(60, 3)
request = self._make_request()
assert_true(validator._validate(request))
assert_true(validator._validate(request))
assert_true(validator._validate(request))

@override_settings(SECURITY_THROTTLING_ENABLED=True)
def test_requests_over_limit_should_fail(self):
validator = PerRequestCacheThrottlingValidator(60, 3)
request = self._make_request()
validator._validate(request)
validator._validate(request)
validator._validate(request)
assert_false(validator._validate(request))

@override_settings(SECURITY_THROTTLING_ENABLED=True)
def test_validate_raises_throttling_exception_when_over_limit(self):
validator = PerRequestCacheThrottlingValidator(60, 2)
request = self._make_request()
validator.validate(request)
validator.validate(request)
with assert_raises(ThrottlingException):
validator.validate(request)

@override_settings(SECURITY_THROTTLING_ENABLED=True)
def test_different_ips_are_counted_independently(self):
validator = PerRequestCacheThrottlingValidator(60, 2)
request_a = self._make_request(ip='10.0.0.1')
request_b = self._make_request(ip='10.0.0.2')
validator._validate(request_a)
validator._validate(request_a)
# IP A is at limit
assert_false(validator._validate(request_a))
# IP B is independent
assert_true(validator._validate(request_b))

@override_settings(SECURITY_THROTTLING_ENABLED=True)
def test_different_paths_are_counted_independently(self):
validator = PerRequestCacheThrottlingValidator(60, 2)
request_a = self._make_request(path='/path-a/')
request_b = self._make_request(path='/path-b/')
validator._validate(request_a)
validator._validate(request_a)
assert_false(validator._validate(request_a))
assert_true(validator._validate(request_b))

@override_settings(SECURITY_THROTTLING_ENABLED=False)
def test_throttling_disabled_always_passes(self):
validator = PerRequestCacheThrottlingValidator(60, 1)
request = self._make_request()
validator.validate(request)
validator.validate(request)
validator.validate(request)
26 changes: 26 additions & 0 deletions security/throttling/validators.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import hashlib
import time
from datetime import timedelta

from django.core.cache import cache
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.urls import resolve, Resolver404
Expand Down Expand Up @@ -55,6 +58,29 @@ def _validate(self, request):
) < self.throttle_at


class PerRequestCacheThrottlingValidator(PerRequestThrottlingValidator):
"""
Throttling validator that counts requests using Django's cache backend.
Unlike PerRequestThrottlingValidator, this doesn't rely on async input request logs,
making throttling effective immediately.
"""

def _get_cache_key(self, request):
ip = get_client_ip(request)[0] or ''
window = int(time.time() / self.timeframe)
raw = '{}:{}:{}:{}'.format(ip, request.method.upper(), request.path, window)
return 'throttle:req:{}'.format(hashlib.md5(raw.encode()).hexdigest())

def _validate(self, request):
key = self._get_cache_key(request)
try:
count = cache.incr(key)
except ValueError:
cache.set(key, 1, timeout=self.timeframe)
count = 1
return count <= self.throttle_at


class LoginThrottlingValidator(ThrottlingValidator):

slug = None
Expand Down
Loading