diff --git a/src/fides/api/app_setup.py b/src/fides/api/app_setup.py index 748e7e86cc4..38edcd6b040 100644 --- a/src/fides/api/app_setup.py +++ b/src/fides/api/app_setup.py @@ -53,6 +53,7 @@ is_rate_limit_enabled, ) from fides.api.util.saas_config_updater import update_saas_configs +from fides.api.util.security_headers import SecurityHeadersMiddleware from fides.config import CONFIG from fides.config.config_proxy import ConfigProxy @@ -87,6 +88,7 @@ def create_fides_app( fastapi_app.state.limiter = fides_limiter # Starlette bug causing this to fail mypy fastapi_app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # type: ignore + fastapi_app.add_middleware(SecurityHeadersMiddleware) for handler in ExceptionHandlers.get_handlers(): # Starlette bug causing this to fail mypy fastapi_app.add_exception_handler(RedisNotConfigured, handler) # type: ignore diff --git a/src/fides/api/util/security_headers.py b/src/fides/api/util/security_headers.py new file mode 100644 index 00000000000..7bce2266b4b --- /dev/null +++ b/src/fides/api/util/security_headers.py @@ -0,0 +1,101 @@ +import re +from dataclasses import dataclass + +from fastapi import Request, Response +from starlette.middleware.base import BaseHTTPMiddleware + +from fides.config import CONFIG + +apply_recommended_headers = CONFIG.security.headers_mode == "recommended" + + +def is_exact_match(matcher: re.Pattern[str], path_name: str) -> bool: + matched_content = re.fullmatch(matcher, path_name) + return matched_content is not None + + +HeaderDefinition = tuple[str, str] + + +@dataclass +class HeaderRule: + matcher: re.Pattern[str] + headers: list[HeaderDefinition] + + +recommended_csp_header_value = re.sub( + r"\s{2,}", + " ", + """ + default-src 'self'; + script-src 'self' 'unsafe-inline'; + style-src 'self' 'unsafe-inline'; + connect-src 'self'; + img-src 'self' blob: data:; + font-src 'self'; + object-src 'none'; + base-uri 'self'; + form-action 'self'; + frame-ancestors 'self'; + upgrade-insecure-requests; + """, +).strip() + +recommended_headers: list[HeaderRule] = [ + HeaderRule( + matcher=re.compile(r"/.*"), + headers=[ + ("X-Content-Type-Options", "nosniff"), + ("Strict-Transport-Security", "max-age=31536000"), + ], + ), + HeaderRule( + matcher=re.compile(r"^/((?!api|health).*)"), + headers=[ + ( + "Content-Security-Policy", + recommended_csp_header_value, + ), + ("X-Frame-Options", "SAMEORIGIN"), + ], + ), +] + + +def get_applicable_header_rules( + path: str, header_rules: list[HeaderRule] +) -> list[HeaderDefinition]: + header_names: set[str] = set() + header_definitions: list[HeaderDefinition] = [] + + for rule in header_rules: + if is_exact_match(rule.matcher, path): + for header in rule.headers: + [header_name, _] = header + if header_name not in header_names: + header_names.add(header_name) + header_definitions.append(header) + + return header_definitions + + +def apply_headers_to_response( + headers: list[HeaderRule], request: Request, response: Response +) -> None: + applicable_headers = get_applicable_header_rules(request.url.path, headers) + for [header_name, header_value] in applicable_headers: + response.headers.append(header_name, header_value) + + +class SecurityHeadersMiddleware(BaseHTTPMiddleware): + """ + Controls what security headers are included in Fides API responses + """ + + async def dispatch(self, request: Request, call_next): # type: ignore + response = await call_next(request) + + if apply_recommended_headers: + apply_headers_to_response(recommended_headers, request, response) + + return response diff --git a/src/fides/config/security_settings.py b/src/fides/config/security_settings.py index f62cf61708a..33c2642b7ce 100644 --- a/src/fides/config/security_settings.py +++ b/src/fides/config/security_settings.py @@ -1,7 +1,7 @@ """This module handles finding and parsing fides configuration files.""" # pylint: disable=C0115,C0116, E0213 -from typing import List, Optional, Pattern, Tuple, Union +from typing import List, Literal, Optional, Pattern, Tuple, Union from pydantic import Field, SerializeAsAny, ValidationInfo, field_validator from pydantic_settings import SettingsConfigDict @@ -97,6 +97,10 @@ class SecuritySettings(FidesSettings): default=None, description="The header used to determine the client IP address for rate limiting. If not set or set to empty string, rate limiting will be disabled.", ) + headers_mode: Union[Literal["none"], Literal["recommended"]] = Field( + default="none", + description="Controls what security headers are included in Fides server responses.", + ) request_rate_limit: str = Field( default="2000/minute", description="The number of requests from a single IP address allowed to hit an endpoint within a rolling 60 second period.", diff --git a/tests/api/util/test_security_headers.py b/tests/api/util/test_security_headers.py new file mode 100644 index 00000000000..761c34d0505 --- /dev/null +++ b/tests/api/util/test_security_headers.py @@ -0,0 +1,110 @@ +import re +from unittest import mock + +import pytest +from fastapi import Request, Response + +from fides.api.util.security_headers import ( + HeaderRule, + apply_headers_to_response, + get_applicable_header_rules, + is_exact_match, + recommended_csp_header_value, + recommended_headers, +) + + +class TestSecurityHeaders: + @pytest.mark.parametrize( + "pattern,path,expected", + [ + (r"\/example-path", "/example-path", True), + (r"\/example-path", "/example-path/with-more-content", False), + (r"\/example-path/?(.*)", "/example-path/with-more-content", True), + (r"\/example-path", "/anti-example-path", False), + (r"\/example-path", "/completely-disparate-no-match", False), + ], + ) + def test_is_exact_match(self, pattern, path, expected): + assert (is_exact_match(re.compile(pattern), path)) is expected + + def test_get_applicable_header_rules_returns_first_matching_rule_for_path(self): + expected_headers: tuple[str, str] = ("header-1", "value-1") + headers: list[HeaderRule] = [ + HeaderRule(re.compile(r"\/a"), [expected_headers]), + HeaderRule(re.compile(r"\/a"), [("header-1", "value-2")]), + ] + + assert get_applicable_header_rules("/a", headers) == [expected_headers] + + def test_get_applicable_header_rules_returns_disparate_headers(self): + header1 = "header-1" + header2 = "header-2" + headers1: tuple[str, str] = (header1, "value-1") + headers2: tuple[str, str] = (header2, "value-2") + headers: list[HeaderRule] = [ + HeaderRule(re.compile(r"\/a-path"), [headers1]), + HeaderRule(re.compile(r"\/a-path"), [headers2]), + ] + + assert get_applicable_header_rules("/a-path", headers) == [headers1, headers2] + + def test_apply_headers_to_response(self): + header = ("header-1", "value-1") + header_rules: list[HeaderRule] = [HeaderRule(re.compile(r".*"), [header])] + + mock_request = mock.Mock(spec=Request) + mock_request.url.path = "/any-path" + + response = Response() + + apply_headers_to_response(header_rules, mock_request, response) + + assert header in response.headers.items() + + @pytest.mark.parametrize( + "path,expected", + [ + ( + "/api/foo", + [ + ("X-Content-Type-Options", "nosniff"), + ("Strict-Transport-Security", "max-age=31536000"), + ], + ), + ( + "/health", + [ + ("X-Content-Type-Options", "nosniff"), + ("Strict-Transport-Security", "max-age=31536000"), + ], + ), + ( + "/privacy-requests", + [ + ("X-Content-Type-Options", "nosniff"), + ("Strict-Transport-Security", "max-age=31536000"), + ( + "Content-Security-Policy", + recommended_csp_header_value, + ), + ("X-Frame-Options", "SAMEORIGIN"), + ], + ), + ( + "/", + [ + ("X-Content-Type-Options", "nosniff"), + ("Strict-Transport-Security", "max-age=31536000"), + ( + "Content-Security-Policy", + recommended_csp_header_value, + ), + ("X-Frame-Options", "SAMEORIGIN"), + ], + ), + ], + ) + def test_recommended_headers_api_route(self, path, expected): + applicable_rules = get_applicable_header_rules(path, recommended_headers) + assert applicable_rules == expected