Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions core/json_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""JSON resource loaders for migration assembly."""

from __future__ import annotations

import json
from pathlib import Path

from rule_engine.normalization import normalize

REPO_ROOT = Path(__file__).resolve().parent.parent
MAPPINGS_DIR = REPO_ROOT / "resources" / "mappings"


def load_mapping(name: str) -> dict[str, str]:
"""Load a JSON mapping file from resources/mappings/."""
path = MAPPINGS_DIR / f"{name}.json"
if not path.exists():
return {}
with open(path) as handle:
return json.load(handle)


def load_set(name: str) -> set[str]:
"""Load a JSON list file as a normalized set."""
path = MAPPINGS_DIR / f"{name}.json"
if not path.exists():
return set()
with open(path) as handle:
data = json.load(handle)
if isinstance(data, list):
return {normalize(item) for item in data}
return {normalize(item) for item in data.keys()}
48 changes: 48 additions & 0 deletions core/run_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Shared runtime state for sequential subject classification."""

from __future__ import annotations

from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import Any


@dataclass
class RunState:
"""Mutable state shared by packs during sequential execution."""

work: Mapping[str, Any]
result: dict[str, list[str]]
original_subjects: list[str] = field(default_factory=list)
remaining_subjects: list[str] = field(default_factory=list)
removed_subjects: list[str] = field(default_factory=list)
retained_matched_subjects: set[str] = field(default_factory=set)
subject_matches: list[dict[str, str]] = field(default_factory=list)

def add(self, output_type: str, value: str) -> None:
if output_type not in self.result:
self.result[output_type] = []
if value not in self.result[output_type]:
self.result[output_type].append(value)

def record_subject_match(
self,
raw: str,
output_type: str,
value: str,
action: str,
) -> None:
self.subject_matches.append(
{
"subject": raw,
"output_type": output_type,
"value": value,
"action": action,
}
)

def record_removed_subject(self, raw: str) -> None:
self.removed_subjects.append(raw)

def record_retained_subject(self, raw: str) -> None:
self.retained_matched_subjects.add(raw.lower().strip())
65 changes: 65 additions & 0 deletions core/subject_classifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Reusable classification core for subject migration."""

from __future__ import annotations

from collections.abc import Iterable, Mapping
from typing import Any

from core.run_state import RunState

DEFAULT_OUTPUT_TYPES = (
"literary_form",
"audience",
"genres",
"subgenres",
"content_formats",
"moods",
"literary_themes",
"literary_tropes",
"main_topics",
"sub_topics",
"people",
"places",
"times",
"things",
"reading_level",
"classification_codes",
"unmapped",
)


class SubjectClassifier:
"""Public orchestration layer for work-level subject classification."""

def __init__(
self,
rule_packs: Iterable[Any],
output_types: Iterable[str] | None = None,
) -> None:
self.rule_packs = list(rule_packs)
self.output_types = tuple(output_types or DEFAULT_OUTPUT_TYPES)

def classify_work(self, work: Mapping[str, Any]) -> dict[str, list[str]]:
"""Return only the proposed tags for compatibility callers."""
return self.classify_work_report(work)["proposed_tags"]

def classify_work_report(self, work: Mapping[str, Any]) -> dict[str, Any]:
"""Run the enabled rule packs against a normalized work object."""
original_subjects = list(work.get("subjects", []))
state = RunState(
work=work,
result={tag_type: [] for tag_type in self.output_types},
original_subjects=original_subjects,
remaining_subjects=list(original_subjects),
)
for pack in self.rule_packs:
pack.apply(state)
return {
"proposed_tags": state.result,
"subject_proposal": {
"original": state.original_subjects,
"removed": state.removed_subjects,
"remaining": state.remaining_subjects,
},
"subject_matches": state.subject_matches,
}
20 changes: 20 additions & 0 deletions demo_content_formats.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"key": "/works/OLDEMO1W",
"subjects": [
"Memoirs",
"Anthology",
"Letters",
"Dictionary",
"Biography",
"Autobiography",
"Manga",
"Encyclopedia",
"Novel",
"format:Diary",
"abc",
"Grade 4"
],
"subject_people": [],
"subject_places": [],
"subject_times": []
}
File renamed without changes.
5 changes: 5 additions & 0 deletions rule_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Low-level rule engine primitives."""

from .base import RulePack

__all__ = ["RulePack"]
15 changes: 15 additions & 0 deletions rule_engine/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Rule-pack interface for the migration core."""

from __future__ import annotations

from core.run_state import RunState


class RulePack:
"""A bounded unit of classification logic for one or more output types."""

name = ""
output_types: tuple[str, ...] = ()

def apply(self, state: RunState) -> None:
raise NotImplementedError
24 changes: 24 additions & 0 deletions rule_engine/normalization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Normalization and classification helpers."""

from __future__ import annotations

import re

READING_LEVEL_RE = re.compile(
r"reading level.grade\s*\d+|grade\s*\d+|rl\s*\d+", re.IGNORECASE
)
CLASSIFICATION_RE = re.compile(
r"^[0-9]{3}(\.[0-9]+)?$|^[a-z]{1,3}\s*[0-9]+|^pr[0-9]", re.IGNORECASE
)


def normalize(value: str) -> str:
return value.lower().strip()


def is_reading_level(value: str) -> bool:
return bool(READING_LEVEL_RE.search(value))


def is_classification_code(value: str) -> bool:
return bool(CLASSIFICATION_RE.match(value.strip()))
18 changes: 18 additions & 0 deletions rule_packs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Concrete rule-pack modules."""

from .content_formats import ContentFormatsPack
from .subject_diagnostics import SubjectDiagnosticsPack

SUBJECT_PACK_CLASSES = (ContentFormatsPack,)

FIELD_PACK_CLASSES = ()

ALL_PACK_CLASSES = SUBJECT_PACK_CLASSES + FIELD_PACK_CLASSES
Comment on lines +3 to +10

__all__ = [
"ALL_PACK_CLASSES",
"ContentFormatsPack",
"FIELD_PACK_CLASSES",
"SUBJECT_PACK_CLASSES",
"SubjectDiagnosticsPack",
]
57 changes: 57 additions & 0 deletions rule_packs/content_formats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Rule pack for content_formats tags."""

from __future__ import annotations

from collections.abc import Mapping

from core.json_loader import load_mapping
from rule_packs.subject_migration import SubjectMigrationPack
from rules import MappingRule, PrefixRule

MOVE = "move"
EXTRACT_ONLY = "extract_only"

# First-pass direct-match policies based on current dry-run evidence.
MOVE_TAGS = frozenset(
{
"Memoir",
"Anthology",
"Letters",
"Dictionary",
}
)


class ContentFormatsPack(SubjectMigrationPack):
name = "content_formats"
output_types = ("content_formats",)
output_type = "content_formats"

def __init__(
self,
move_mapping: Mapping[str, str] | None = None,
extract_only_mapping: Mapping[str, str] | None = None,
) -> None:
self.rules = (
PrefixRule("format", action=EXTRACT_ONLY),
MappingRule(move_mapping, default_action=MOVE),
MappingRule(extract_only_mapping, default_action=EXTRACT_ONLY),
)

@classmethod
def default(cls) -> "ContentFormatsPack":
mapping = load_mapping("content_formats")
move_mapping = {
legacy: canonical
for legacy, canonical in mapping.items()
if canonical in MOVE_TAGS
}
extract_only_mapping = {
legacy: canonical
for legacy, canonical in mapping.items()
if canonical not in MOVE_TAGS
}
return cls(
move_mapping=move_mapping,
extract_only_mapping=extract_only_mapping,
)
48 changes: 48 additions & 0 deletions rule_packs/subject_diagnostics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Rule pack for dropped, reading-level, classification, and unmapped subjects."""

from __future__ import annotations

from core.json_loader import load_set
from core.run_state import RunState
from rule_engine.base import RulePack
from rule_engine.normalization import (
is_classification_code,
is_reading_level,
normalize,
)


class SubjectDiagnosticsPack(RulePack):
name = "subject_diagnostics"
output_types = ("reading_level", "classification_codes", "unmapped")

def __init__(self, droppable: set[str] | None = None) -> None:
self.droppable = set(droppable or ())

def apply(self, state: RunState) -> None:
for raw in state.remaining_subjects:
key = normalize(raw)
if key in self.droppable:
continue
if key in state.retained_matched_subjects:
Comment on lines +23 to +27
continue

if is_reading_level(raw):
value = raw.strip()
if value:
state.add("reading_level", value)
continue

if is_classification_code(raw):
value = raw.strip()
if value:
state.add("classification_codes", value)
continue

value = raw.strip()
if value:
state.add("unmapped", value)

@classmethod
def default(cls) -> "SubjectDiagnosticsPack":
return cls(droppable=load_set("droppable"))
Loading