diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c4a27b..8421dc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,60 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.2.0] - 2026-04-26 + +Service-pack release: a large algorithmic-perf fix and a security/hardening +sweep on the public API. Same library, same nine detectors, same checksums — +just much faster on large documents and stricter about untrusted inputs. + +### Added + +- `Shield.reset()`: discard the accumulated Mapping (counters and entries) without rebuilding the Shield. Use between unrelated documents or users to prevent cross-document token leakage on `deanonymize`. Detector list and `max_input_bytes` are preserved. +- `Shield(max_input_bytes=...)` constructor option: refuses inputs whose UTF-8 byte length exceeds the cap. Default unbounded; recommended for pipelines that ingest untrusted text since `Shield.anonymize` allocates O(n) memory in input size. +- CLI `--force` flag on `anonymize` and `deanonymize`: required to overwrite an existing output or mapping file. Without it the command refuses with a clear error instead of silently clobbering. +- CLI `--max-bytes` flag on every subcommand (default 64 MiB): refuses pathologically large stdin or file inputs without crashing the process. +- `Shield` docstring documents thread-safety and the cross-document leakage class. +- `tests/test_security_hardening.py`: 24 new tests covering `Mapping.from_dict` validation paths, `Anonymizer` constructor enforcement, `Shield` input-size guard and reset behavior, and `Detector.__init_subclass__` enforcement. +- `tests/test_overlap_property.py`: Hypothesis-driven property test asserting the new bisect-based overlap resolution is set-equivalent to the previous quadratic algorithm over arbitrary match sets. + +### Changed + +- `Anonymizer._resolve_overlaps` now uses a `bisect_left`-based neighbor check instead of a linear `any(...)` scan over `taken`. Worst-case complexity drops from O(n²) to O(n log n) for the lookup; per-call insertion remains O(n) due to list shifts. On a 100 KiB synthetic document with ~4900 candidate matches the median `Shield.anonymize()` latency drops from ~1700 ms to ~70 ms (≈25× faster); 1 MiB inputs that previously timed the harness out now complete in ~1.5 s. Output is byte-identical to the previous algorithm. +- `Mapping.from_dict` now validates every field at runtime: token shape (`[TYPE_NNN]`), token-prefix vs declared type, counter coverage of issued tokens, and the scalar types of values and counters. **Breaking** for callers that previously fed malformed JSON and relied on lenient acceptance — those calls now raise `ValueError`. +- `Anonymizer.__init__` now rejects: + - Detector lists with duplicate `name` attributes (previously silently overwrote the priority dict and broke overlap-resolution determinism). + - `Strategy` values other than `Strategy.TOKEN` (the only implemented strategy in v0.1; passing anything else previously was a silent no-op). The strategy is also stored on the instance now, ready for future `MASK` / `FAKE` dispatch. +- `Detector` base class now enforces `pii_type` and `name` presence at class-definition time via `__init_subclass__`. Subclasses missing either previously instantiated successfully and crashed on first `detect()` call. +- CLI `anonymize` / `deanonymize` now refuse to overwrite an existing output or mapping file unless `--force` is passed. **Breaking** for scripts that relied on auto-overwrite — add `--force` to preserve previous behavior. +- CLI `detect --format` is now case-insensitive (`JSON`, `Json`, `json` all accepted); previously only lowercase worked. +- `Mapping` now uses `__slots__` and `Mapping.token_for` uses an f-string instead of `str.format`. Internal performance polish; no API change. +- `Anonymizer` now caches the priority dict in `__init__` instead of rebuilding it on every `_resolve_overlaps` call. Internal; no API change. +- `__version__` (in `__init__.py`) now falls back to a `"0.0.0+local"` sentinel when `importlib.metadata.version("llm-safe-pl")` raises `PackageNotFoundError`. This keeps `import llm_safe_pl` working when the source tree is loaded via `PYTHONPATH` without an editable install — useful for development workflows and CI checkout-only steps. +- `examples/cli_usage.md` updated for the new `--force` and `--max-bytes` flags. +- `docs/quickstart.md`, `docs/limitations.md`, and `README.md` updated to mention the new `Shield.reset` and `max_input_bytes` capabilities and to call out the breaking CLI behavior. + +### Fixed + +- Removed silent failure modes when a custom detector subclass omitted required class variables (now raised at class-definition time, see `Detector.__init_subclass__` change above). + +### Migration notes for 0.1.x → 0.2.0 + +The two changes that may surprise existing users: + +1. **CLI overwrite now requires `--force`.** A cron job that runs + `llm-safe anonymize doc.txt -o out.txt -m map.json` daily will now fail on + the second run because `out.txt` already exists. Add `-f` / `--force`: + `llm-safe anonymize doc.txt -o out.txt -m map.json --force`. +2. **`Mapping.from_dict` now raises on malformed JSON** that previously + loaded leniently. If you persist mappings from one process and load them + in another, mappings produced by 0.1.0 still load cleanly in 0.2.0 + (round-trip is preserved); only hand-crafted or tampered JSON triggers + the new errors. + +If neither applies to you, 0.2.0 is a drop-in upgrade with a 25× speedup on +larger documents and the new `Shield.reset()` / `max_input_bytes` options +available when you want them. + ## [0.1.0] - 2026-04-22 ### Added diff --git a/README.md b/README.md index fbd64a2..7f3ab37 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Reversible PII anonymization for Polish documents, designed for LLM workflows. -> **Status: alpha (v0.1.0).** Core regex + checksum detection, anonymization, deanonymization, and the CLI are implemented and tested (280+ tests, ~99% coverage). The optional spaCy NER recognizer for PERSON / ORGANIZATION / LOCATION is scheduled for v0.1.1. See [CHANGELOG.md](CHANGELOG.md) and [Roadmap](#roadmap). +> **Status: alpha (v0.2.0).** Core regex + checksum detection, anonymization, deanonymization, and the CLI are implemented and tested (319 tests, ~99% coverage). v0.2.0 is a service-pack release: ~25× faster `Shield.anonymize()` on documents with thousands of PII items, plus a security-hardening pass (strict `Mapping.from_dict` validation, `Shield(max_input_bytes=...)`, `Shield.reset()`, CLI `--force` / `--max-bytes`). The optional spaCy NER recognizer for PERSON / ORGANIZATION / LOCATION is still scheduled for a later 0.x release. See [CHANGELOG.md](CHANGELOG.md) and [Roadmap](#roadmap). --- @@ -64,7 +64,9 @@ restored = shield.deanonymize(result.text) The same value always maps to the same token within a `Shield` instance, including across multiple `anonymize()` calls. Formatted identifiers (e.g. `526-000-12-46`) round-trip exactly — the dashes are preserved. -PERSON detection (`Jan Kowalski` in the example) requires `pip install "llm-safe-pl[ner]"` and is part of Phase 6. Without the extra, names remain visible and structured identifiers (PESEL, NIP, IBAN, etc.) are tokenized. +If you process unrelated documents (different users, different requests) through one Shield, call `shield.reset()` between them to drop the accumulated mapping and prevent cross-document token leakage. For pipelines that ingest untrusted text, pass `Shield(max_input_bytes=...)` to refuse oversized inputs at the boundary instead of letting them turn into an O(n) memory blowup. + +PERSON detection (`Jan Kowalski` in the example) requires `pip install "llm-safe-pl[ner]"` and is scheduled for a later 0.x release. Without the extra, names remain visible and structured identifiers (PESEL, NIP, IBAN, etc.) are tokenized. ## Try it live in Colab @@ -82,11 +84,15 @@ llm-safe detect document.txt --format text # Anonymize: writes rewritten text and a reversible mapping llm-safe anonymize document.txt -o anon.txt -m mapping.json +# Re-running on the same outputs requires --force (otherwise the CLI refuses +# to overwrite, since v0.2.0) +llm-safe anonymize document.txt -o anon.txt -m mapping.json --force + # Restore original values (prints to stdout, or use -o FILE) llm-safe deanonymize anon.txt -m mapping.json ``` -The CLI reads UTF-8 (with or without BOM) and UTF-16 (when a BOM is present), so files produced by PowerShell's default `>` redirection work without manual conversion. Output is always canonical UTF-8. +The CLI reads UTF-8 (with or without BOM) and UTF-16 (when a BOM is present), so files produced by PowerShell's default `>` redirection work without manual conversion. Output is always canonical UTF-8. Each subcommand also supports `--max-bytes` (default 64 MiB) to refuse pathologically large inputs. ## What's supported @@ -155,14 +161,15 @@ The 80% coverage gate is enforced in `pyproject.toml`. ## Roadmap -- **Phase 0** — Scaffolding: packaging, CI, locked public API surface, tests green. **Done.** -- **Phase 1** — `models.py`: `Match`, `Mapping`, `AnonymizeResult`, `PIIType`. **Done.** -- **Phase 2** — Checksum validators: PESEL, NIP, REGON, Luhn, mod-97 IBAN. **Done.** -- **Phase 3** — Nine regex + checksum detectors. **Done.** -- **Phase 4** — `Anonymizer` / `Deanonymizer` with consistent tokens. **Done.** -- **Phase 5** — `Shield` facade + CLI subcommands. **Done.** -- **Phase 6** — Optional spaCy NER recognizer. *Next — planned for v0.1.1.* -- **v0.2.0+** — Faker-based fake substitution, PDF/DOCX parsing, broader IBAN detector scope. +- **Phase 0** — Scaffolding: packaging, CI, locked public API surface, tests green. **Done in v0.1.0.** +- **Phase 1** — `models.py`: `Match`, `Mapping`, `AnonymizeResult`, `PIIType`. **Done in v0.1.0.** +- **Phase 2** — Checksum validators: PESEL, NIP, REGON, Luhn, mod-97 IBAN. **Done in v0.1.0.** +- **Phase 3** — Nine regex + checksum detectors. **Done in v0.1.0.** +- **Phase 4** — `Anonymizer` / `Deanonymizer` with consistent tokens. **Done in v0.1.0.** +- **Phase 5** — `Shield` facade + CLI subcommands. **Done in v0.1.0.** +- **v0.2.0** — Algorithmic perf fix (`Shield.anonymize()` ~25× faster on large docs), security-hardening pass (`Mapping.from_dict` strict validation, `Shield.reset()`, `Shield(max_input_bytes=...)`, CLI `--force` / `--max-bytes`). **Done.** See [CHANGELOG.md](CHANGELOG.md). +- **Next 0.x** — Optional spaCy NER recognizer for PERSON / ORGANIZATION / LOCATION via `pip install "llm-safe-pl[ner]"`. +- **Later** — Faker-based fake substitution, PDF/DOCX parsing, broader IBAN detector scope. ## Non-goals diff --git a/docs/limitations.md b/docs/limitations.md index 398c911..96e35d9 100644 --- a/docs/limitations.md +++ b/docs/limitations.md @@ -101,6 +101,24 @@ Detectors are whitespace-sensitive for the phone, IBAN, and credit card formats. - **PII types the library does not detect.** Names, organizations, and locations without the `[ner]` extra; street addresses, landline phones with parens, dates of birth, legacy bank account formats, non-Polish identifiers. See the rest of this document for the full list. - **Active adversaries inside your process.** If a compromised dependency or malicious import runs before `Shield.anonymize`, the raw document is already in memory. - **Side channels outside the prompt body.** Request metadata, IP address, timing, response-size-based inference, retained billing records. +- **Cross-document leakage on round-trip via a long-lived Shield.** A single Shield's Mapping accumulates across every `anonymize()` call. If a process anonymizes document A (sensitive) and later runs `deanonymize` on document B (attacker-controlled) using the same Shield, any literal `[PESEL_001]` substring in B is substituted with A's PESEL. Call `Shield.reset()` between unrelated documents/users, or instantiate a fresh `Shield` per request. + +### v0.2.0 hardening you should opt into + +The library exposes three boundary controls. They are not enabled by default +because they require a deployment decision; turn them on when you are +processing untrusted text: + +- `Shield(max_input_bytes=...)` — refuses inputs whose UTF-8 byte length + exceeds the cap. Without it, `Shield.anonymize` allocates O(n) memory in + input size, so unbounded input is a denial-of-service vector. +- `Shield.reset()` between unrelated calls — drops the accumulated Mapping + so cross-document leakage on round-trip cannot occur (see previous + section). +- Persisted `Mapping` JSON is validated strictly on load + (`Mapping.from_dict` / `from_json` raise on tampered or malformed input). + This protects you from accepting a hostile mapping file that would + otherwise silently corrupt subsequent `deanonymize` calls. ### Assumptions diff --git a/docs/quickstart.md b/docs/quickstart.md index 09fe5d3..e9eb14b 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -59,6 +59,41 @@ A few things to notice: - `shield.deanonymize(text)` with no mapping argument uses the Shield's own mapping. Pass an explicit `Mapping` to deanonymize against a saved state. - Detected PII formats are preserved: `526-000-12-46` stays dashed, `4532 0151 1283 0366` stays spaced. The round-trip reproduces the source byte-for-byte. +## Reusing a Shield across unrelated documents + +Because the Mapping is shared across calls, processing two unrelated documents through the same Shield mixes their tokens. If the second document contains attacker-controlled text with a literal `[PESEL_001]` substring, `deanonymize` will substitute it with the *first* document's PESEL value. Use `Shield.reset()` between unrelated documents to drop the accumulated mapping: + +```python +shield = Shield() + +# Document A — internal, trusted. +result_a = shield.anonymize(doc_a) +restored_a = shield.deanonymize(llm_response_a) + +# Discard A's tokens before touching B. +shield.reset() + +# Document B — could be untrusted. +result_b = shield.anonymize(doc_b) +restored_b = shield.deanonymize(llm_response_b) +``` + +`reset()` keeps the detector list and any `max_input_bytes` setting; it only drops the Mapping. Equivalent to instantiating a fresh `Shield()` but cheaper if you have a custom detector list. + +## Guarding against oversized input + +`Shield.anonymize` allocates O(n) memory in input size. For pipelines that ingest untrusted text, set `max_input_bytes` to refuse oversized inputs at the boundary instead of letting them OOM the process: + +```python +# Refuse anything over 1 MiB. +shield = Shield(max_input_bytes=1024 * 1024) + +shield.anonymize(very_large_text) +# ValueError: input is 5242880 bytes; max_input_bytes=1048576 +``` + +Default is unbounded. Set it whenever the upstream caller can't be trusted. + ## CLI Everything the Python API does is also available from a shell: @@ -70,11 +105,14 @@ llm-safe detect document.txt # Anonymize; writes two files. llm-safe anonymize document.txt -o anon.txt -m mapping.json +# Re-running on the same outputs requires --force (since v0.2.0). +llm-safe anonymize document.txt -o anon.txt -m mapping.json --force + # Restore originals. llm-safe deanonymize anon.txt -m mapping.json -o restored.txt ``` -See [`cli_usage.md`](../examples/cli_usage.md) for more. +Each subcommand also supports `--max-bytes` (default 64 MiB) to refuse oversized stdin or file inputs. See [`cli_usage.md`](../examples/cli_usage.md) for more. ## Saving and loading mappings @@ -95,13 +133,15 @@ shield = Shield(mapping=loaded) # Any anonymize() call will reuse tokens already allocated in `loaded`. ``` +`Mapping.from_dict` / `from_json` validate every field at load time: token shape, type-prefix consistency, and counter coverage of issued tokens. Tampered or hand-edited mapping JSON raises `ValueError` rather than loading silently. Mappings produced by `Mapping.to_json` always round-trip cleanly. + ## What Shield detects - PESEL, NIP, REGON (Polish government IDs, all checksum-validated) -- Polish ID card (dowód osobisty), passport (regex-only for v0.1) +- Polish ID card (dowód osobisty), passport (regex-only) - Phone, email, PL IBAN, credit card (Luhn-validated, 13-19 digits) -Person, organization, and location names require the optional `[ner]` extra — planned for v0.1.1. +Person, organization, and location names require the optional `[ner]` extra — scheduled for a later 0.x release. ## Next steps diff --git a/examples/cli_usage.md b/examples/cli_usage.md index 658edde..8f10201 100644 --- a/examples/cli_usage.md +++ b/examples/cli_usage.md @@ -46,6 +46,17 @@ cat mapping.json Now it's safe to send `anonymized.txt` to any LLM API. +Re-running on the same outputs requires `--force` (since v0.2.0). The CLI refuses to silently overwrite an existing `-o` or `-m` file: + +```bash +llm-safe anonymize document.txt -o anonymized.txt -m mapping.json +# Usage: llm-safe anonymize ... +# Error: anonymized.txt exists; pass --force to overwrite + +llm-safe anonymize document.txt -o anonymized.txt -m mapping.json --force +# (overwrites both) +``` + ## Deanonymize Restore original values using a mapping produced by `anonymize`. @@ -56,6 +67,9 @@ llm-safe deanonymize anonymized.txt -m mapping.json # To a file llm-safe deanonymize anonymized.txt -m mapping.json -o restored.txt + +# --force is required to overwrite an existing output file (since v0.2.0) +llm-safe deanonymize anonymized.txt -m mapping.json -o restored.txt --force ``` ## End-to-end round-trip in one shell @@ -96,6 +110,17 @@ The CLI accepts UTF-8 (with or without BOM) and UTF-16 LE/BE when a BOM is prese Output is always canonical UTF-8 without BOM. +## Input-size cap + +Every subcommand supports `--max-bytes` (default 64 MiB). Inputs larger than that are refused with a clear error instead of being slurped into memory. Useful when piping from an untrusted source: + +```bash +# Refuse anything over 1 MiB. +some_user_program | llm-safe anonymize - -o out.txt -m map.json --max-bytes $((1024 * 1024)) +``` + +Set it lower than the default if you know your real inputs are bounded; raising it above 64 MiB is allowed but treats the host's RAM as the only ceiling. + ## Help ```bash diff --git a/examples/hardening.py b/examples/hardening.py new file mode 100644 index 0000000..a430f7d --- /dev/null +++ b/examples/hardening.py @@ -0,0 +1,57 @@ +"""Hardening features added in v0.2.0: Shield.reset() and max_input_bytes. + +Two short demos, each independent of the other. + +Run: python examples/hardening.py +""" + +from llm_safe_pl import Shield + + +def demo_reset() -> None: + """Reset the accumulated mapping between unrelated documents.""" + print("--- Demo 1: Shield.reset() ---") + shield = Shield() + + # Document A — sensitive, internal. + doc_a = "Klient: PESEL 44051401359." + result_a = shield.anonymize(doc_a) + print(f"After document A: mapping has {len(shield.mapping)} entry/entries.") + print(f" text: {result_a.text}") + + # Without reset(), document A's tokens persist into the next call. + # If document B happens to contain a literal '[PESEL_001]' (e.g. an LLM + # response that the caller forgot to validate), `deanonymize` would + # substitute it with A's PESEL. + shield.reset() + print(f"After reset(): mapping has {len(shield.mapping)} entry/entries.") + + # Document B — different user, different request. + doc_b = "Inny klient: PESEL 92010100003." + result_b = shield.anonymize(doc_b) + print(f"After document B: mapping has {len(shield.mapping)} entry/entries.") + print(f" text: {result_b.text}") + print() + + +def demo_max_input_bytes() -> None: + """Refuse oversized input at the boundary.""" + print("--- Demo 2: max_input_bytes ---") + # Cap at 100 bytes for demonstration; realistic values are MiB-scale. + shield = Shield(max_input_bytes=100) + + small = "PESEL 44051401359 — fits." + print(f"Small input ({len(small.encode('utf-8'))} bytes): accepted.") + shield.anonymize(small) + + big = "x" * 200 + print(f"Big input ({len(big.encode('utf-8'))} bytes): rejected.") + try: + shield.anonymize(big) + except ValueError as exc: + print(f" ValueError: {exc}") + + +if __name__ == "__main__": + demo_reset() + demo_max_input_bytes() diff --git a/notebooks/quickstart.ipynb b/notebooks/quickstart.ipynb index 8381411..b961607 100644 --- a/notebooks/quickstart.ipynb +++ b/notebooks/quickstart.ipynb @@ -2,6 +2,7 @@ "cells": [ { "cell_type": "markdown", + "id": "7fb27b941602401d91542211134fc71a", "metadata": {}, "source": [ "# llm-safe-pl — anonymize Polish PII before sending documents to an LLM\n", @@ -23,6 +24,7 @@ { "cell_type": "code", "execution_count": null, + "id": "acae54e37e7d407bbb7b55eff062a284", "metadata": {}, "outputs": [], "source": [ @@ -31,6 +33,7 @@ }, { "cell_type": "markdown", + "id": "9a63283cbaf04dbcab1f6479b197f3a8", "metadata": {}, "source": [ "## The scenario\n", @@ -41,6 +44,7 @@ { "cell_type": "code", "execution_count": null, + "id": "8dd0d8092fe74a7c96281538738b07e2", "metadata": {}, "outputs": [], "source": [ @@ -63,6 +67,7 @@ }, { "cell_type": "markdown", + "id": "72eea5119410473aa328ad9291626812", "metadata": {}, "source": [ "## Step 1 — detect\n", @@ -73,12 +78,23 @@ { "cell_type": "code", "execution_count": null, + "id": "8edb47106e1a46a883d545849b8ab81b", "metadata": {}, "outputs": [], - "source": "from llm_safe_pl import Shield\n\nshield = Shield()\nmatches = shield.detect(document)\n\nprint(f\"Found {len(matches)} PII hits:\\n\")\nfor m in matches:\n print(f\" [{m.type.value:<11}] {m.value!r:<40} at {m.start}-{m.end} (detector: {m.detector})\")" + "source": [ + "from llm_safe_pl import Shield\n", + "\n", + "shield = Shield()\n", + "matches = shield.detect(document)\n", + "\n", + "print(f\"Found {len(matches)} PII hits:\\n\")\n", + "for m in matches:\n", + " print(f\" [{m.type.value:<11}] {m.value!r:<40} at {m.start}-{m.end} (detector: {m.detector})\")" + ] }, { "cell_type": "markdown", + "id": "10185d26023b46108eb7d9f57d49d2b3", "metadata": {}, "source": [ "## Step 2 — anonymize\n", @@ -91,6 +107,7 @@ { "cell_type": "code", "execution_count": null, + "id": "8763a12b2bbd4a93a75aff182afb95dc", "metadata": {}, "outputs": [], "source": [ @@ -102,6 +119,7 @@ }, { "cell_type": "markdown", + "id": "7623eae2785240b9bd12b16a66d81610", "metadata": {}, "source": [ "## Step 3 — call the LLM\n", @@ -116,12 +134,47 @@ { "cell_type": "code", "execution_count": null, + "id": "7cdc8c89c7104fffa095e18ddfef8986", "metadata": {}, "outputs": [], - "source": "import os\n\nSYSTEM = (\n \"You are a Polish-language customer service assistant. \"\n \"Summarize the user's message in 3 bullet points. \"\n \"Keep every placeholder of the form [TYPE_NNN] intact — do not rename, \"\n \"translate, or expand them.\"\n)\n\nif os.environ.get(\"OPENAI_API_KEY\"):\n from openai import OpenAI\n\n client = OpenAI()\n response = client.chat.completions.create(\n model=\"gpt-4o-mini\",\n messages=[\n {\"role\": \"system\", \"content\": SYSTEM},\n {\"role\": \"user\", \"content\": result.text},\n ],\n )\n llm_output = response.choices[0].message.content or \"\"\nelse:\n llm_output = (\n \"Podsumowanie zgłoszenia:\\n\"\n \"- Klient [PESEL_001] zgłasza brak przelewu dla zamówienia INV-2025-00412.\\n\"\n \"- Kontakt zwrotny: [PHONE_001] lub [EMAIL_001].\\n\"\n \"- Faktura VAT [NIP_001], REGON [REGON_001].\\n\"\n \"- IBAN [IBAN_001] — sprawdzić status transakcji.\"\n )\n\nprint(\"LLM response (still anonymized):\\n\")\nprint(llm_output)" + "source": [ + "import os\n", + "\n", + "SYSTEM = (\n", + " \"You are a Polish-language customer service assistant. \"\n", + " \"Summarize the user's message in 3 bullet points. \"\n", + " \"Keep every placeholder of the form [TYPE_NNN] intact — do not rename, \"\n", + " \"translate, or expand them.\"\n", + ")\n", + "\n", + "if os.environ.get(\"OPENAI_API_KEY\"):\n", + " from openai import OpenAI\n", + "\n", + " client = OpenAI()\n", + " response = client.chat.completions.create(\n", + " model=\"gpt-4o-mini\",\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": SYSTEM},\n", + " {\"role\": \"user\", \"content\": result.text},\n", + " ],\n", + " )\n", + " llm_output = response.choices[0].message.content or \"\"\n", + "else:\n", + " llm_output = (\n", + " \"Podsumowanie zgłoszenia:\\n\"\n", + " \"- Klient [PESEL_001] zgłasza brak przelewu dla zamówienia INV-2025-00412.\\n\"\n", + " \"- Kontakt zwrotny: [PHONE_001] lub [EMAIL_001].\\n\"\n", + " \"- Faktura VAT [NIP_001], REGON [REGON_001].\\n\"\n", + " \"- IBAN [IBAN_001] — sprawdzić status transakcji.\"\n", + " )\n", + "\n", + "print(\"LLM response (still anonymized):\\n\")\n", + "print(llm_output)" + ] }, { "cell_type": "markdown", + "id": "b118ea5561624da68c537baed56e602f", "metadata": {}, "source": [ "## Step 4 — deanonymize\n", @@ -132,6 +185,7 @@ { "cell_type": "code", "execution_count": null, + "id": "938c804e27f84196a10c8828c723f798", "metadata": {}, "outputs": [], "source": [ @@ -143,6 +197,7 @@ }, { "cell_type": "markdown", + "id": "504fb2a444614c0babb325280ed9130a", "metadata": {}, "source": [ "## Persisting the mapping\n", @@ -153,6 +208,7 @@ { "cell_type": "code", "execution_count": null, + "id": "59bbdb311c014d738909a11f9e486628", "metadata": {}, "outputs": [], "source": [ @@ -171,6 +227,53 @@ }, { "cell_type": "markdown", + "id": "b43b363d81ae4b689946ece5c682cd59", + "metadata": {}, + "source": [ + "## Reusing a Shield safely (v0.2.0)\n", + "\n", + "If you process more than one document or more than one user through a single Shield, the accumulated `Mapping` persists across calls — that's how a repeated value gets the same token across documents. The flip side: if you later call `deanonymize` on attacker-controlled text that contains a literal `[PESEL_001]` substring, it will be substituted with the *first* document's PESEL.\n", + "\n", + "Two boundary controls landed in v0.2.0 to handle this:\n", + "\n", + "- `Shield.reset()` drops the accumulated mapping; use between unrelated documents.\n", + "- `Shield(max_input_bytes=N)` refuses inputs over N bytes; use for pipelines that ingest untrusted text.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8a65eabff63a45729fe45fb5ade58bdc", + "metadata": {}, + "outputs": [], + "source": [ + "# Demonstrate reset(): two unrelated documents stay isolated.\n", + "shield_b = Shield()\n", + "\n", + "# Document A — internal, trusted.\n", + "result_a = shield_b.anonymize(\"Klient A: PESEL 44051401359.\")\n", + "print(\"After A:\", result_a.text)\n", + "print(\" mapping size:\", len(shield_b.mapping))\n", + "\n", + "# Reset before processing the next document.\n", + "shield_b.reset()\n", + "\n", + "# Document B — different user, different request.\n", + "result_b = shield_b.anonymize(\"Klient B: PESEL 92010100003.\")\n", + "print(\"After B:\", result_b.text)\n", + "print(\" mapping size:\", len(shield_b.mapping))\n", + "\n", + "# Refuse oversized input at the boundary (default is unbounded).\n", + "guarded = Shield(max_input_bytes=64 * 1024) # 64 KiB cap\n", + "try:\n", + " guarded.anonymize(\"x\" * 100_000)\n", + "except ValueError as exc:\n", + " print(\"Refused:\", exc)" + ] + }, + { + "cell_type": "markdown", + "id": "c3933fab20d04ec698c2621248eb3be0", "metadata": {}, "source": [ "## What this install did — and did not — catch\n", @@ -190,7 +293,7 @@ "python -m spacy download pl_core_news_lg\n", "```\n", "\n", - "NER support ships in v0.1.1 — see the [roadmap](https://github.com/Tatarinho/llm-safe-pl#roadmap).\n", + "NER support is scheduled for a later 0.x release — see the [roadmap](https://github.com/Tatarinho/llm-safe-pl#roadmap).\n", "\n", "## Next steps\n", "\n", @@ -198,7 +301,7 @@ "- [`docs/llm_workflow.md`](https://github.com/Tatarinho/llm-safe-pl/blob/main/docs/llm_workflow.md) — deeper guidance on the anonymize → LLM → deanonymize pattern.\n", "- [`docs/limitations.md`](https://github.com/Tatarinho/llm-safe-pl/blob/main/docs/limitations.md) — read before shipping to production.\n", "\n", - "Found a false positive, a missed identifier, or have a feature idea? [Open an issue](https://github.com/Tatarinho/llm-safe-pl/issues). Stars welcome." + "Found a false positive, a missed identifier, or have a feature idea? [Open an issue](https://github.com/Tatarinho/llm-safe-pl/issues). Stars welcome.\n" ] } ], @@ -217,4 +320,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/pyproject.toml b/pyproject.toml index 1f53fb6..ecb0347 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "llm-safe-pl" -version = "0.1.0" +version = "0.2.0" description = "Reversible PII anonymization for Polish documents, designed for LLM workflows." readme = "README.md" license = "MIT" diff --git a/src/llm_safe_pl/__init__.py b/src/llm_safe_pl/__init__.py index 5bcd4b2..9fb8c99 100644 --- a/src/llm_safe_pl/__init__.py +++ b/src/llm_safe_pl/__init__.py @@ -4,12 +4,19 @@ implementation detail and may change without a major version bump. """ +from importlib.metadata import PackageNotFoundError from importlib.metadata import version as _version from llm_safe_pl.models import AnonymizeResult, Mapping, Match, PIIType from llm_safe_pl.shield import Shield -__version__ = _version("llm-safe-pl") +try: + __version__ = _version("llm-safe-pl") +except PackageNotFoundError: + # Bare-clone import (PYTHONPATH=src python -c "import llm_safe_pl") without + # an editable install lacks distribution metadata. Use a sentinel so import + # succeeds in dev workflows that haven't run `pip install -e .` yet. + __version__ = "0.0.0+local" __all__ = [ "AnonymizeResult", diff --git a/src/llm_safe_pl/anonymizer.py b/src/llm_safe_pl/anonymizer.py index 7b6c73c..653751d 100644 --- a/src/llm_safe_pl/anonymizer.py +++ b/src/llm_safe_pl/anonymizer.py @@ -9,6 +9,8 @@ from __future__ import annotations +from bisect import bisect_left + from llm_safe_pl.detectors.base import Detector from llm_safe_pl.models import AnonymizeResult, Mapping, Match from llm_safe_pl.strategies import Strategy @@ -23,11 +25,34 @@ def __init__( mapping: Mapping, strategy: Strategy = Strategy.TOKEN, ) -> None: + # Detector names participate in the overlap-resolution priority dict + # below; duplicates would silently overwrite, breaking determinism. + seen_names: set[str] = set() + for d in detectors: + if d.name in seen_names: + raise ValueError(f"Duplicate detector name: {d.name!r}") + seen_names.add(d.name) self._detectors = detectors self._mapping = mapping + # Strategy is stored ready for future MASK/FAKE dispatch. v0.1 only + # implements TOKEN; passing anything else is reserved for future use + # rather than silently dropped. + if strategy is not Strategy.TOKEN: + raise ValueError(f"Strategy {strategy!r} not implemented in v0.1") + self._strategy = strategy + # Cached once at construction — detectors are immutable for the + # Anonymizer's lifetime, so the priority map is too. + self._priority: dict[str, int] = {d.name: i for i, d in enumerate(detectors)} + self._priority_fallback = len(self._priority) def detect(self, text: str) -> list[Match]: - """Find all PII matches with overlaps resolved, without mutating Mapping.""" + """Find all PII matches with overlaps resolved, without mutating Mapping. + + Returns a fresh ``list[Match]`` for performance — internal callers can + sort in place. The public-facing immutable view is ``Shield.detect``, + which wraps this result in a tuple. Treat the returned list as + read-only unless you own the Anonymizer instance. + """ all_matches: list[Match] = [] for detector in self._detectors: all_matches.extend(detector.detect(text)) @@ -52,17 +77,27 @@ def anonymize(self, text: str) -> AnonymizeResult: ) def _resolve_overlaps(self, matches: list[Match]) -> list[Match]: - priority = {d.name: i for i, d in enumerate(self._detectors)} - fallback = len(priority) + priority = self._priority + fallback = self._priority_fallback def sort_key(m: Match) -> tuple[int, int, int]: length = m.end - m.start return (-length, m.start, priority.get(m.detector, fallback)) + # Invariant: ``taken`` stays sorted by start and pairwise non-overlapping. + # A new candidate can only overlap its left or right neighbor in start order, + # so a single bisect lookup checks both. Replaces an O(n^2) linear scan that + # dominated runtime on documents with thousands of PII items. taken: list[Match] = [] + starts: list[int] = [] for m in sorted(matches, key=sort_key): - if not any(_overlaps(m, t) for t in taken): - taken.append(m) + i = bisect_left(starts, m.start) + if i > 0 and taken[i - 1].end > m.start: + continue + if i < len(taken) and taken[i].start < m.end: + continue + starts.insert(i, m.start) + taken.insert(i, m) return taken diff --git a/src/llm_safe_pl/cli.py b/src/llm_safe_pl/cli.py index 002a696..eb82a8d 100644 --- a/src/llm_safe_pl/cli.py +++ b/src/llm_safe_pl/cli.py @@ -52,17 +52,40 @@ def _root( """llm-safe-pl — reversible PII anonymization for Polish documents.""" -def _read_text(source: Path) -> str: +_DEFAULT_MAX_BYTES = 64 * 1024 * 1024 # 64 MiB; protects against unbounded stdin + + +def _read_text(source: Path, max_bytes: int = _DEFAULT_MAX_BYTES) -> str: """Read text from a file path, or from stdin when ``source`` is ``-``. Accepts UTF-8 (±BOM) and UTF-16 (±endianness) with BOM in either case. + Refuses inputs larger than ``max_bytes`` to prevent unbounded memory use. """ - data = sys.stdin.buffer.read() if str(source) == "-" else source.read_bytes() + if str(source) == "-": + data = sys.stdin.buffer.read(max_bytes + 1) + else: + size = source.stat().st_size + if size > max_bytes: + raise typer.BadParameter( + f"{source} is {size} bytes; --max-bytes={max_bytes}", + ) + data = source.read_bytes() + if len(data) > max_bytes: + raise typer.BadParameter( + f"input exceeds --max-bytes={max_bytes}", + ) if data[:2] in (b"\xff\xfe", b"\xfe\xff"): return data.decode("utf-16") return data.decode("utf-8-sig") +def _check_overwrite(path: Path, force: bool) -> None: + if path.exists() and not force: + raise typer.BadParameter( + f"{path} exists; pass --force to overwrite", + ) + + @app.command("anonymize") def anonymize_cmd( input_file: Annotated[Path, typer.Argument(help="Text file to anonymize (use - for stdin).")], @@ -70,9 +93,18 @@ def anonymize_cmd( mapping: Annotated[ Path, typer.Option("--mapping", "-m", help="Path to write the Mapping JSON.") ], + force: Annotated[ + bool, + typer.Option("--force", "-f", help="Overwrite output and mapping files if they exist."), + ] = False, + max_bytes: Annotated[ + int, typer.Option("--max-bytes", help="Refuse inputs larger than this many bytes.") + ] = _DEFAULT_MAX_BYTES, ) -> None: """Anonymize a text file; writes rewritten text and a reversible mapping.""" - text = _read_text(input_file) + _check_overwrite(output, force) + _check_overwrite(mapping, force) + text = _read_text(input_file, max_bytes=max_bytes) shield = Shield() result = shield.anonymize(text) output.write_text(result.text, encoding="utf-8") @@ -90,10 +122,18 @@ def deanonymize_cmd( Path | None, typer.Option("--output", "-o", help="Write restored text here (stdout if omitted or -)."), ] = None, + force: Annotated[ + bool, typer.Option("--force", help="Overwrite output file if it exists.") + ] = False, + max_bytes: Annotated[ + int, typer.Option("--max-bytes", help="Refuse inputs larger than this many bytes.") + ] = _DEFAULT_MAX_BYTES, ) -> None: """Deanonymize a text file using a saved mapping.""" - text = _read_text(input_file) - loaded_mapping = Mapping.from_json(_read_text(mapping)) + if output is not None and str(output) != "-": + _check_overwrite(output, force) + text = _read_text(input_file, max_bytes=max_bytes) + loaded_mapping = Mapping.from_json(_read_text(mapping, max_bytes=max_bytes)) shield = Shield(mapping=loaded_mapping) restored = shield.deanonymize(text) if output is None or str(output) == "-": @@ -108,12 +148,16 @@ def detect_cmd( output_format: Annotated[ str, typer.Option("--format", "-f", help="Output format: json or text.") ] = "json", + max_bytes: Annotated[ + int, typer.Option("--max-bytes", help="Refuse inputs larger than this many bytes.") + ] = _DEFAULT_MAX_BYTES, ) -> None: """Detect PII without anonymizing; prints to stdout.""" - text = _read_text(input_file) + text = _read_text(input_file, max_bytes=max_bytes) shield = Shield() matches = shield.detect(text) - if output_format == "json": + fmt = output_format.lower() + if fmt == "json": data = [ { "type": m.type.value, @@ -125,7 +169,7 @@ def detect_cmd( for m in matches ] typer.echo(json.dumps(data, ensure_ascii=False, indent=2)) - elif output_format == "text": + elif fmt == "text": for m in matches: typer.echo(f"{m.type.value}\t{m.start}-{m.end}\t{m.value}") else: diff --git a/src/llm_safe_pl/detectors/base.py b/src/llm_safe_pl/detectors/base.py index de33ddd..4de52a8 100644 --- a/src/llm_safe_pl/detectors/base.py +++ b/src/llm_safe_pl/detectors/base.py @@ -13,7 +13,7 @@ import re from abc import ABC, abstractmethod from collections.abc import Iterator -from typing import ClassVar +from typing import Any, ClassVar from llm_safe_pl.models import Match, PIIType @@ -24,6 +24,19 @@ class Detector(ABC): pii_type: ClassVar[PIIType] name: ClassVar[str] + def __init_subclass__(cls, **kwargs: Any) -> None: + super().__init_subclass__(**kwargs) + # Subclasses must declare ``pii_type`` and ``name``. ABC's + # ``@abstractmethod`` only enforces missing methods, not missing + # class variables — without this check, a subclass that forgets + # ``name`` would instantiate fine and crash at first ``detect`` + # call. Caught at class-definition time instead. + if cls.__name__ in {"RegexDetector"}: + return # the abstract regex helper isn't a concrete detector + for required in ("pii_type", "name"): + if not hasattr(cls, required): + raise TypeError(f"{cls.__name__} must define class variable {required!r}") + @abstractmethod def detect(self, text: str) -> Iterator[Match]: """Yield every PII occurrence found in ``text``.""" diff --git a/src/llm_safe_pl/models.py b/src/llm_safe_pl/models.py index 1b51dea..54cb1bb 100644 --- a/src/llm_safe_pl/models.py +++ b/src/llm_safe_pl/models.py @@ -7,10 +7,13 @@ from __future__ import annotations import json +import re from dataclasses import dataclass from enum import Enum from typing import Any +_TOKEN_SHAPE = re.compile(r"^\[([A-Z][A-Z_]*)_(\d+)\]$") + class PIIType(str, Enum): """Categories of personally identifiable information the library can handle. @@ -57,8 +60,9 @@ class Mapping: that owns it) across threads unless the caller serializes writes. """ + __slots__ = ("_counters", "_forward", "_reverse") + SCHEMA_VERSION = 1 - _TOKEN_FORMAT = "[{type}_{counter:03d}]" def __init__(self) -> None: self._forward: dict[tuple[PIIType, str], str] = {} @@ -72,7 +76,7 @@ def token_for(self, value: str, pii_type: PIIType) -> str: return existing counter = self._counters.get(pii_type, 0) + 1 self._counters[pii_type] = counter - token = self._TOKEN_FORMAT.format(type=pii_type.value.upper(), counter=counter) + token = f"[{pii_type.value.upper()}_{counter:03d}]" self._forward[key] = token self._reverse[token] = (pii_type, value) return token @@ -96,17 +100,71 @@ def to_dict(self) -> dict[str, Any]: @classmethod def from_dict(cls, data: dict[str, Any]) -> Mapping: + """Load a Mapping from its JSON-dict shape with strict validation. + + Raises ``ValueError`` on any of: wrong schema version, malformed + token shape, type/token-prefix mismatch, counters that don't cover + their entries, non-int counter values, missing required fields. + + Validation matters because Mapping JSON is the cross-process trust + boundary — a tampered file should fail loudly, not silently corrupt + the Mapping. + """ + if not isinstance(data, dict): + raise ValueError(f"Mapping.from_dict expected a dict, got {type(data).__name__}") version = data.get("schema_version") if version != cls.SCHEMA_VERSION: raise ValueError(f"Unsupported mapping schema version: {version!r}") + + raw_counters = data.get("counters", {}) + if not isinstance(raw_counters, dict): + raise ValueError(f"counters must be a dict, got {type(raw_counters).__name__}") + counters: dict[PIIType, int] = {} + for t, n in raw_counters.items(): + if not isinstance(n, int) or isinstance(n, bool) or n < 0: + raise ValueError(f"counter for {t!r} must be a non-negative int, got {n!r}") + counters[PIIType(t)] = n + + raw_entries = data.get("entries") + if raw_entries is None: + raise ValueError("Mapping.from_dict requires an 'entries' field") + if not isinstance(raw_entries, list): + raise ValueError(f"entries must be a list, got {type(raw_entries).__name__}") + m = cls() - m._counters = {PIIType(t): int(n) for t, n in data.get("counters", {}).items()} - for entry in data["entries"]: + m._counters = counters + max_per_type: dict[PIIType, int] = {} + for entry in raw_entries: + if not isinstance(entry, dict): + raise ValueError(f"each entry must be a dict, got {type(entry).__name__}") + for required in ("token", "type", "value"): + if required not in entry: + raise ValueError(f"entry missing required field {required!r}: {entry!r}") token = entry["token"] - pii_type = PIIType(entry["type"]) value = entry["value"] + if not isinstance(token, str) or not isinstance(value, str): + raise ValueError(f"entry token and value must be strings: {entry!r}") + pii_type = PIIType(entry["type"]) + shape = _TOKEN_SHAPE.fullmatch(token) + if shape is None: + raise ValueError(f"token {token!r} does not match [TYPE_NNN] shape") + token_type_prefix = shape.group(1) + if token_type_prefix != pii_type.value.upper(): + raise ValueError(f"token {token!r} prefix does not match type {pii_type.value!r}") + counter_n = int(shape.group(2)) + prev = max_per_type.get(pii_type, 0) + if counter_n > prev: + max_per_type[pii_type] = counter_n m._forward[(pii_type, value)] = token m._reverse[token] = (pii_type, value) + + for pii_type, observed_max in max_per_type.items(): + declared = counters.get(pii_type, 0) + if declared < observed_max: + raise ValueError( + f"counter for {pii_type.value!r} is {declared} but entry " + f"counter {observed_max} was issued" + ) return m def to_json(self) -> str: diff --git a/src/llm_safe_pl/shield.py b/src/llm_safe_pl/shield.py index fc98ec8..0a6f2a4 100644 --- a/src/llm_safe_pl/shield.py +++ b/src/llm_safe_pl/shield.py @@ -3,8 +3,20 @@ A Shield instance owns a single Mapping that accumulates tokens across every ``anonymize()`` call, so the same value always maps to the same token within the lifetime of that Shield. Users wanting isolation between documents should -instantiate a new Shield per document. Custom detector lists and a -preloaded Mapping can be supplied to the constructor. +instantiate a new Shield per document, or call :meth:`Shield.reset` to drop +accumulated state. Custom detector lists and a preloaded Mapping can be +supplied to the constructor. + +Thread safety: a single Shield is NOT thread-safe. ``Mapping.token_for`` +mutates state without locking, so concurrent ``anonymize`` calls on the same +Shield can race. Use one Shield per request/thread, or serialize calls +externally. + +Cross-document leakage: because the Mapping persists across calls, feeding +attacker-controlled text containing literal token shapes (e.g. ``[PESEL_001]``) +through ``deanonymize`` on a Shield that previously processed sensitive text +will substitute the attacker's token with the prior value. Always create a +fresh Shield (or call ``reset()``) before processing untrusted text. """ from __future__ import annotations @@ -18,13 +30,25 @@ class Shield: - """Orchestrates the full anonymize/deanonymize round-trip.""" + """Orchestrates the full anonymize/deanonymize round-trip. + + Args: + detectors: Custom detector list (default: ``DEFAULT_DETECTORS``). + mapping: Preloaded Mapping (default: empty Mapping). + strategy: Anonymization strategy (only ``TOKEN`` in v0.1). + max_input_bytes: If set, ``anonymize``/``detect`` raise ``ValueError`` + for inputs whose UTF-8 byte length exceeds this. Default ``None`` + (unlimited). Recommended for hardened pipelines that ingest + untrusted text — ``Shield.anonymize`` allocates O(n) memory in + input size, so an unbounded input is a DoS vector. + """ def __init__( self, detectors: list[Detector] | None = None, mapping: Mapping | None = None, strategy: Strategy = Strategy.TOKEN, + max_input_bytes: int | None = None, ) -> None: self._mapping = mapping if mapping is not None else Mapping() self._detectors = list(detectors) if detectors is not None else list(DEFAULT_DETECTORS) @@ -34,12 +58,36 @@ def __init__( strategy=strategy, ) self._deanonymizer = Deanonymizer() + if max_input_bytes is not None and max_input_bytes < 0: + raise ValueError(f"max_input_bytes must be non-negative, got {max_input_bytes}") + self._max_input_bytes = max_input_bytes @property def mapping(self) -> Mapping: return self._mapping + def reset(self) -> None: + """Discard the accumulated Mapping; counters and entries reset to empty. + + Use between unrelated documents/users to prevent cross-document token + leakage. Detector list and other Shield configuration are preserved. + """ + self._mapping = Mapping() + self._anonymizer = Anonymizer( + detectors=self._detectors, + mapping=self._mapping, + strategy=self._anonymizer._strategy, + ) + + def _check_input_size(self, text: str) -> None: + if self._max_input_bytes is None: + return + size = len(text.encode("utf-8")) + if size > self._max_input_bytes: + raise ValueError(f"input is {size} bytes; max_input_bytes={self._max_input_bytes}") + def anonymize(self, text: str) -> AnonymizeResult: + self._check_input_size(text) return self._anonymizer.anonymize(text) def deanonymize(self, text: str, mapping: Mapping | None = None) -> str: @@ -48,5 +96,6 @@ def deanonymize(self, text: str, mapping: Mapping | None = None) -> str: ) def detect(self, text: str) -> tuple[Match, ...]: + self._check_input_size(text) matches = self._anonymizer.detect(text) return tuple(sorted(matches, key=lambda m: m.start)) diff --git a/tests/test_anonymizer.py b/tests/test_anonymizer.py index 9df758d..98ef2dc 100644 --- a/tests/test_anonymizer.py +++ b/tests/test_anonymizer.py @@ -1,6 +1,7 @@ """Tests for the Anonymizer orchestrator.""" import re +from itertools import pairwise from typing import ClassVar import pytest @@ -219,3 +220,93 @@ def test_multi_detector_replacement_order_is_by_start( result = anon.anonymize(text) starts = [m.start for m in result.matches] assert starts == sorted(starts) + + +class TestAnonymizerOverlapResolutionStress: + """Stress tests pinning the bisect-based overlap-resolution against the + naive O(n^2) reference. Inputs constructed to exercise the path that + previously dominated runtime on large documents (~5000 PII items). + """ + + @staticmethod + def _naive_resolve(matches: list, detectors: list) -> list: + """Reference implementation — the original O(n^2) algorithm.""" + priority = {d.name: i for i, d in enumerate(detectors)} + fallback = len(priority) + + def sort_key(m): # type: ignore[no-untyped-def] + return (-(m.end - m.start), m.start, priority.get(m.detector, fallback)) + + def overlaps(a, b): # type: ignore[no-untyped-def] + return a.start < b.end and b.start < a.end + + taken: list = [] + for m in sorted(matches, key=sort_key): + if not any(overlaps(m, t) for t in taken): + taken.append(m) + return taken + + def test_thousand_non_overlapping_matches_all_kept(self) -> None: + # 1000 disjoint 11-digit PESELs separated by spaces. + # Use repetition of a known-valid PESEL so the regex hits. + pesel = "44051401359" + text = " ".join([pesel] * 1000) + anon = Anonymizer(detectors=[PeselDetector()], mapping=Mapping()) + matches = anon.detect(text) + assert len(matches) == 1000 + # All non-overlapping + for a, b in pairwise(matches): + assert a.end <= b.start + + def test_hundred_identical_span_matches_collapse_to_one(self) -> None: + # Create 100 detectors that all match the same span; only the highest + # priority (first in list) should be retained. + from llm_safe_pl.models import Match, PIIType + + detectors_mock = [PeselDetector()] # placeholder list for priority + synth_matches = [ + Match( + type=PIIType.PESEL, + value="44051401359", + start=0, + end=11, + detector=f"d{i}", + ) + for i in range(100) + ] + anon = Anonymizer(detectors=detectors_mock, mapping=Mapping()) + result = anon._resolve_overlaps(synth_matches) + assert len(result) == 1 + + def test_against_naive_reference_on_large_synthetic(self) -> None: + # Build a mixed input: 500 non-overlapping clusters, each with 5 + # candidate matches that mutually overlap. After resolution we should + # have 500 winners and the result must equal the naive implementation. + from llm_safe_pl.models import Match, PIIType + + synth_matches: list[Match] = [] + for cluster_idx in range(500): + base = cluster_idx * 100 + # 5 overlapping matches inside this cluster; varying lengths. + for k in range(5): + synth_matches.append( + Match( + type=PIIType.PESEL, + value="x" * (10 + k), + start=base + k, + end=base + 10 + k * 2, + detector=f"d{k}", + ) + ) + + detectors_mock = [PeselDetector()] + anon = Anonymizer(detectors=detectors_mock, mapping=Mapping()) + actual = anon._resolve_overlaps(list(synth_matches)) + expected = self._naive_resolve(list(synth_matches), detectors_mock) + + # Sort both by start to compare set-equivalence (algorithm preserves + # the same selection; final ordering is not part of the contract). + actual_sorted = sorted(actual, key=lambda m: m.start) + expected_sorted = sorted(expected, key=lambda m: m.start) + assert actual_sorted == expected_sorted + assert len(actual_sorted) == 500 diff --git a/tests/test_overlap_property.py b/tests/test_overlap_property.py new file mode 100644 index 0000000..acb69b2 --- /dev/null +++ b/tests/test_overlap_property.py @@ -0,0 +1,60 @@ +"""Property test: bisect-based ``_resolve_overlaps`` matches the naive O(n^2) reference. + +The fast and slow algorithms must produce the same set of retained matches +for any input. Hypothesis generates random Match objects (varied spans, +varied detector names — i.e. varied priorities) and asserts equivalence. +""" + +from __future__ import annotations + +from hypothesis import given, settings +from hypothesis import strategies as st + +from llm_safe_pl.anonymizer import Anonymizer +from llm_safe_pl.detectors.pesel import PeselDetector +from llm_safe_pl.models import Mapping, Match, PIIType + + +def _naive_resolve(matches: list[Match], detectors: list) -> list[Match]: + priority = {d.name: i for i, d in enumerate(detectors)} + fallback = len(priority) + + def sort_key(m: Match) -> tuple[int, int, int]: + return (-(m.end - m.start), m.start, priority.get(m.detector, fallback)) + + def overlaps(a: Match, b: Match) -> bool: + return a.start < b.end and b.start < a.end + + taken: list[Match] = [] + for m in sorted(matches, key=sort_key): + if not any(overlaps(m, t) for t in taken): + taken.append(m) + return taken + + +_match_strategy = st.builds( + lambda start, length, detector_idx: Match( + type=PIIType.PESEL, + value="x" * length, + start=start, + end=start + length, + detector=f"d{detector_idx}", + ), + start=st.integers(min_value=0, max_value=200), + length=st.integers(min_value=1, max_value=20), + detector_idx=st.integers(min_value=0, max_value=4), +) + + +@given(st.lists(_match_strategy, max_size=80)) +@settings(max_examples=200) +def test_bisect_matches_naive_on_arbitrary_match_sets(matches: list[Match]) -> None: + detectors = [PeselDetector()] + anon = Anonymizer(detectors=detectors, mapping=Mapping()) + + actual = anon._resolve_overlaps(list(matches)) + expected = _naive_resolve(list(matches), detectors) + + actual_keys = sorted((m.start, m.end, m.detector) for m in actual) + expected_keys = sorted((m.start, m.end, m.detector) for m in expected) + assert actual_keys == expected_keys diff --git a/tests/test_security_hardening.py b/tests/test_security_hardening.py new file mode 100644 index 0000000..0a34950 --- /dev/null +++ b/tests/test_security_hardening.py @@ -0,0 +1,211 @@ +"""Tests covering the security/hardening changes from focused-review.md. + +Groups: Mapping.from_dict validation, Shield input-size guard and reset, +Anonymizer detector-name collision and strategy rejection, Detector +subclass enforcement. +""" + +from __future__ import annotations + +import re +from typing import ClassVar + +import pytest + +from llm_safe_pl.anonymizer import Anonymizer +from llm_safe_pl.detectors.base import RegexDetector +from llm_safe_pl.detectors.pesel import PeselDetector +from llm_safe_pl.models import Mapping, PIIType +from llm_safe_pl.shield import Shield +from llm_safe_pl.strategies import Strategy + +# ---- Mapping.from_dict validation ------------------------------------------ + + +def _baseline() -> dict: + return { + "schema_version": 1, + "counters": {"pesel": 1}, + "entries": [{"token": "[PESEL_001]", "type": "pesel", "value": "44051401359"}], + } + + +class TestMappingFromDictValidation: + def test_baseline_round_trips(self) -> None: + m = Mapping.from_dict(_baseline()) + assert m.value_for("[PESEL_001]") == "44051401359" + + def test_rejects_non_dict(self) -> None: + with pytest.raises(ValueError, match="expected a dict"): + Mapping.from_dict([]) # type: ignore[arg-type] + + def test_rejects_wrong_schema_version(self) -> None: + data = _baseline() + data["schema_version"] = 2 + with pytest.raises(ValueError, match="schema version"): + Mapping.from_dict(data) + + def test_rejects_missing_entries_field(self) -> None: + data = _baseline() + del data["entries"] + with pytest.raises(ValueError, match="entries"): + Mapping.from_dict(data) + + def test_rejects_token_with_wrong_shape(self) -> None: + data = _baseline() + data["entries"][0]["token"] = "garbage" + with pytest.raises(ValueError, match="shape"): + Mapping.from_dict(data) + + def test_rejects_token_prefix_type_mismatch(self) -> None: + data = _baseline() + data["entries"][0]["token"] = "[NIP_001]" # token says NIP but type says pesel + with pytest.raises(ValueError, match="prefix does not match"): + Mapping.from_dict(data) + + def test_rejects_counter_below_observed_max(self) -> None: + data = _baseline() + data["counters"]["pesel"] = 0 # but [PESEL_001] is in entries + with pytest.raises(ValueError, match="counter"): + Mapping.from_dict(data) + + def test_rejects_negative_counter(self) -> None: + data = _baseline() + data["counters"]["pesel"] = -1 + with pytest.raises(ValueError, match="non-negative"): + Mapping.from_dict(data) + + def test_rejects_string_counter(self) -> None: + data = _baseline() + data["counters"]["pesel"] = "1" # type: ignore[assignment] + with pytest.raises(ValueError, match="non-negative int"): + Mapping.from_dict(data) + + def test_rejects_unknown_pii_type(self) -> None: + data = _baseline() + data["entries"][0]["type"] = "ssn" + with pytest.raises(ValueError): + Mapping.from_dict(data) + + def test_rejects_non_string_value(self) -> None: + data = _baseline() + data["entries"][0]["value"] = 12345 # type: ignore[assignment] + with pytest.raises(ValueError, match="must be strings"): + Mapping.from_dict(data) + + +# ---- Anonymizer constructor enforcement ----------------------------------- + + +class _DupADetector(RegexDetector): + pii_type: ClassVar[PIIType] = PIIType.PESEL + name: ClassVar[str] = "dup" + pattern: ClassVar[re.Pattern[str]] = re.compile(r"AAA") + + +class _DupBDetector(RegexDetector): + pii_type: ClassVar[PIIType] = PIIType.NIP + name: ClassVar[str] = "dup" # same name as the one above + pattern: ClassVar[re.Pattern[str]] = re.compile(r"BBB") + + +class TestAnonymizerConstructor: + def test_rejects_duplicate_detector_names(self) -> None: + with pytest.raises(ValueError, match="Duplicate detector name"): + Anonymizer( + detectors=[_DupADetector(), _DupBDetector()], + mapping=Mapping(), + ) + + def test_rejects_unimplemented_strategy(self) -> None: + # Forge an enum-like value that isn't TOKEN. + with pytest.raises(ValueError, match="not implemented"): + + class _Fake: + pass + + Anonymizer( + detectors=[PeselDetector()], + mapping=Mapping(), + strategy=_Fake(), # type: ignore[arg-type] + ) + + def test_accepts_token_strategy_explicitly(self) -> None: + # Should not raise. + Anonymizer( + detectors=[PeselDetector()], + mapping=Mapping(), + strategy=Strategy.TOKEN, + ) + + def test_detect_returns_list(self) -> None: + # Anonymizer is the internal/mutable-list path; Shield.detect is the + # public-immutable-tuple path. Both must remain in their roles. + anon = Anonymizer(detectors=[PeselDetector()], mapping=Mapping()) + result = anon.detect("PESEL 44051401359") + assert isinstance(result, list) + + +# ---- Shield input-size guard + reset() ------------------------------------- + + +class TestShieldHardening: + def test_anonymize_respects_max_input_bytes(self) -> None: + shield = Shield(max_input_bytes=10) + with pytest.raises(ValueError, match="max_input_bytes"): + shield.anonymize("This is far longer than 10 bytes of text") + + def test_detect_respects_max_input_bytes(self) -> None: + shield = Shield(max_input_bytes=10) + with pytest.raises(ValueError, match="max_input_bytes"): + shield.detect("This is far longer than 10 bytes of text") + + def test_no_guard_by_default(self) -> None: + shield = Shield() + # Should not raise on a 10 KiB input. + shield.anonymize("x" * 10240) + + def test_negative_max_input_bytes_rejected(self) -> None: + with pytest.raises(ValueError, match="non-negative"): + Shield(max_input_bytes=-1) + + def test_reset_clears_mapping(self) -> None: + shield = Shield() + shield.anonymize("PESEL 44051401359") + assert len(shield.mapping) == 1 + shield.reset() + assert len(shield.mapping) == 0 + + def test_reset_preserves_detector_list(self) -> None: + shield = Shield(detectors=[PeselDetector()]) + result_a = shield.anonymize("PESEL 44051401359 i email jan@example.pl") + # Email is NOT in the custom detector list, so it should not be touched. + assert "jan@example.pl" in result_a.text + shield.reset() + result_b = shield.anonymize("PESEL 44051401359 i email jan@example.pl") + assert "jan@example.pl" in result_b.text # still no email detector + + +# ---- Detector __init_subclass__ enforcement ------------------------------- + + +class TestDetectorInitSubclass: + def test_concrete_detector_without_pii_type_rejected(self) -> None: + with pytest.raises(TypeError, match="pii_type"): + + class _Bad(RegexDetector): + # Missing pii_type intentionally + name: ClassVar[str] = "bad" + pattern: ClassVar[re.Pattern[str]] = re.compile(r"x") + + def test_concrete_detector_without_name_rejected(self) -> None: + with pytest.raises(TypeError, match="name"): + + class _Bad(RegexDetector): + pii_type: ClassVar[PIIType] = PIIType.PESEL + # Missing name intentionally + pattern: ClassVar[re.Pattern[str]] = re.compile(r"x") + + def test_regex_detector_helper_class_passes(self) -> None: + # Re-importing the abstract helper must not raise. + assert RegexDetector.__name__ == "RegexDetector"