Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ dist/
Thumbs.db
.idea/
.vscode/
.claude/

# Local runtime/config artifacts
config.yaml
data/
logs/
outputs/
clawhub-zaomeng-skill/runtime/data/
.tools/
174 changes: 152 additions & 22 deletions src/skill_support/novel_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@

from __future__ import annotations

import json
import re
import unicodedata
from pathlib import Path
from typing import Any

from src.utils.text_parser import load_novel_text, split_sentences

_ALIAS_REGISTRY_CACHE: dict[str, "_AliasRegistry"] = {}

_DEFAULT_ALIAS_FILE = Path(__file__).resolve().parent.parent.parent / "zaomeng-skill" / "character_aliases.json"

MIXED_EXCERPT_MIN_CHARS = 3_000
MIXED_EXCERPT_MIN_SENTENCES = 40

Expand Down Expand Up @@ -56,18 +61,113 @@
MATCH_IGNORED_PATTERN = re.compile(r"[\s\u3000\u00b7\u2027\u30fb'\"`~!@#$%^&*()_+\-=\[\]{}\\|;:,.<>/?,。!?:;、“”‘’《》【】()]")


class _AliasRegistry:
__slots__ = ("canonical_to_spec", "alias_to_canonical")

def __init__(self, canonical_to_spec: dict[str, str], alias_to_canonical: dict[str, str]):
self.canonical_to_spec = canonical_to_spec
self.alias_to_canonical = alias_to_canonical

@property
def empty(self) -> bool:
return not self.canonical_to_spec


_EMPTY_REGISTRY = _AliasRegistry({}, {})


def _load_alias_registry(alias_file: str | Path | None = None) -> _AliasRegistry:
path = Path(alias_file) if alias_file else _DEFAULT_ALIAS_FILE
cache_key = str(path)
if cache_key in _ALIAS_REGISTRY_CACHE:
return _ALIAS_REGISTRY_CACHE[cache_key]

if not path.exists():
_ALIAS_REGISTRY_CACHE[cache_key] = _EMPTY_REGISTRY
return _EMPTY_REGISTRY

try:
data = json.loads(path.read_text("utf-8"))
except (json.JSONDecodeError, OSError):
_ALIAS_REGISTRY_CACHE[cache_key] = _EMPTY_REGISTRY
return _EMPTY_REGISTRY

canonical_to_spec: dict[str, str] = {}
alias_to_canonical: dict[str, str] = {}
for canonical, aliases in data.items():
if canonical.startswith("_") or not isinstance(aliases, list):
continue
all_names = [canonical] + [str(a).strip() for a in aliases if str(a).strip()]
canonical_to_spec[canonical] = "|".join(all_names)
for name in all_names:
alias_to_canonical[name] = canonical
normalized = _normalize_match_text(name)
if normalized and normalized not in alias_to_canonical:
alias_to_canonical[normalized] = canonical

registry = _AliasRegistry(canonical_to_spec, alias_to_canonical)
_ALIAS_REGISTRY_CACHE[cache_key] = registry
return registry


def _resolve_character_aliases(
characters: list[str] | None,
registry: _AliasRegistry,
) -> list[str] | None:
if not characters or registry.empty:
return characters
resolved: list[str] = []
for name in characters:
clean = str(name or "").strip()
if not clean:
resolved.append(clean)
continue
if "|" in clean:
resolved.append(clean)
continue
if clean in registry.canonical_to_spec:
resolved.append(registry.canonical_to_spec[clean])
continue
if clean in registry.alias_to_canonical:
canonical = registry.alias_to_canonical[clean]
resolved.append(registry.canonical_to_spec[canonical])
continue
normalized = _normalize_match_text(clean)
if normalized and normalized in registry.alias_to_canonical:
canonical = registry.alias_to_canonical[normalized]
resolved.append(registry.canonical_to_spec[canonical])
continue
if normalized and normalized in registry.canonical_to_spec:
resolved.append(registry.canonical_to_spec[normalized])
continue
resolved.append(clean)
return resolved


def _canonical_name(character_spec: str) -> str:
clean = str(character_spec or "").strip()
return clean.split("|", 1)[0].strip()


def _split_aliases(character_spec: str) -> list[str]:
parts = [a.strip() for a in str(character_spec or "").split("|")]
return [a for a in parts if a]


def prepare_novel_excerpt(
text: str,
*,
max_sentences: int = 80,
max_chars: int = 12_000,
characters: list[str] | None = None,
alias_file: str | Path | None = None,
) -> str:
return build_excerpt_payload_from_text(
text,
max_sentences=max_sentences,
max_chars=max_chars,
characters=characters,
alias_file=alias_file,
)["excerpt"]


Expand All @@ -77,14 +177,19 @@ def build_excerpt_payload_from_text(
max_sentences: int = 80,
max_chars: int = 12_000,
characters: list[str] | None = None,
alias_file: str | Path | None = None,
) -> dict[str, Any]:
registry = _load_alias_registry(alias_file)
characters = _resolve_character_aliases(characters, registry)
clean = str(text or "").strip()
if not clean:
requested = _normalize_characters(characters)
canonical = [_canonical_name(c) for c in requested]
return {
"excerpt": "",
"requested_characters": _normalize_characters(characters),
"requested_characters": canonical,
"matched_characters": [],
"missing_characters": _normalize_characters(characters),
"missing_characters": canonical,
"excerpt_strategy": "empty",
"excerpt_stages": _empty_stage_blocks(),
}
Expand All @@ -102,12 +207,13 @@ def build_excerpt_payload_from_text(
return payload

selected_indices = _select_leading_indices(sentences, max_sentences=max_sentences, max_chars=max_chars)
canonical_requested = [_canonical_name(c) for c in requested]
return _build_excerpt_result(
sentences,
selected_indices,
requested_characters=requested,
requested_characters=canonical_requested,
matched_characters=[],
missing_characters=requested,
missing_characters=canonical_requested,
excerpt_strategy="leading_sentences",
max_chars=max_chars,
)
Expand All @@ -119,12 +225,14 @@ def load_prepared_novel_excerpt(
max_sentences: int = 80,
max_chars: int = 12_000,
characters: list[str] | None = None,
alias_file: str | Path | None = None,
) -> str:
return prepare_novel_excerpt(
load_novel_text(str(novel_path)),
max_sentences=max_sentences,
max_chars=max_chars,
characters=characters,
alias_file=alias_file,
)


Expand All @@ -134,13 +242,15 @@ def build_excerpt_payload(
max_sentences: int = 80,
max_chars: int = 12_000,
characters: list[str] | None = None,
alias_file: str | Path | None = None,
) -> dict[str, object]:
path = Path(novel_path)
excerpt_payload = build_excerpt_payload_from_text(
load_novel_text(str(path)),
max_sentences=max_sentences,
max_chars=max_chars,
characters=characters,
alias_file=alias_file,
)
return {
"source_path": str(path),
Expand All @@ -161,9 +271,12 @@ def _normalize_characters(characters: list[str] | None) -> list[str]:
seen: set[str] = set()
for item in list(characters or []):
name = str(item or "").strip()
if not name or name in seen:
if not name:
continue
seen.add(name)
canonical = _canonical_name(name)
if canonical in seen:
continue
seen.add(canonical)
ordered.append(name)
return ordered

Expand All @@ -174,10 +287,12 @@ def _normalize_match_text(text: str) -> str:


def _sentence_mentions_character(sentence: str, character: str) -> bool:
normalized_character = _normalize_match_text(character)
if not normalized_character:
return False
return normalized_character in _normalize_match_text(sentence)
normalized_sentence = _normalize_match_text(sentence)
for alias in _split_aliases(character):
normalized_alias = _normalize_match_text(alias)
if normalized_alias and normalized_alias in normalized_sentence:
return True
return False


def _leading_excerpt(sentences: list[str], *, max_sentences: int, max_chars: int) -> str:
Expand Down Expand Up @@ -213,19 +328,25 @@ def _character_focused_excerpt(
max_sentences: int,
max_chars: int,
) -> dict[str, Any]:
character_hits: dict[str, list[int]] = {name: [] for name in characters}
canonical_to_spec = {_canonical_name(name): name for name in characters}
character_hits: dict[str, list[int]] = {canon: [] for canon in canonical_to_spec}
all_hit_indices: list[int] = []
seen_hits: set[int] = set()

for idx, sentence in enumerate(sentences):
for name in characters:
if _sentence_mentions_character(sentence, name):
character_hits[name].append(idx)

matched = [name for name, hits in character_hits.items() if hits]
missing = [name for name in characters if not character_hits[name]]
for canon, spec in canonical_to_spec.items():
if _sentence_mentions_character(sentence, spec):
character_hits[canon].append(idx)
if idx not in seen_hits:
seen_hits.add(idx)
all_hit_indices.append(idx)

matched = [canon for canon, hits in character_hits.items() if hits]
missing = [canon for canon in canonical_to_spec if not character_hits[canon]]
if not matched:
return {
"excerpt": "",
"requested_characters": characters,
"requested_characters": list(canonical_to_spec.keys()),
"matched_characters": [],
"missing_characters": missing,
"excerpt_strategy": "leading_sentences",
Expand Down Expand Up @@ -268,14 +389,15 @@ def _character_focused_excerpt(
selected_indices,
character_hits=character_hits,
matched_characters=matched,
character_specs=list(characters),
max_sentences=max_sentences,
max_chars=max_chars,
)
augmented = True
return _build_excerpt_result(
sentences,
selected_indices,
requested_characters=characters,
requested_characters=list(canonical_to_spec.keys()),
matched_characters=matched,
missing_characters=missing,
excerpt_strategy="character_windows_mixed" if augmented else "character_windows",
Expand Down Expand Up @@ -305,6 +427,7 @@ def _augment_character_excerpt_indices(
*,
character_hits: dict[str, list[int]],
matched_characters: list[str],
character_specs: list[str] | None = None,
max_sentences: int,
max_chars: int,
) -> list[int]:
Expand Down Expand Up @@ -350,11 +473,12 @@ def add_candidates(indices: list[int], *, radius: int = 0) -> None:
if enough():
return ordered

add_candidates(_dialogue_candidate_indices(sentences, matched_characters), radius=0)
spec_list = character_specs if character_specs else matched_characters
add_candidates(_dialogue_candidate_indices(sentences, spec_list), radius=0)
if enough():
return ordered

add_candidates(_thought_or_evaluation_indices(sentences, matched_characters), radius=0)
add_candidates(_thought_or_evaluation_indices(sentences, spec_list), radius=0)
return ordered


Expand Down Expand Up @@ -505,7 +629,13 @@ def _build_representative_hit_plan(
*,
center_budget: int,
) -> list[int]:
per_character = {name: _spread_sample_indices(character_hits.get(name, []), sample_cap=3) for name in matched_characters}
per_character = {
name: _spread_sample_indices(
character_hits.get(name, []),
sample_cap=max(1, min(len(sorted(set(character_hits.get(name, [])))), center_budget)),
)
for name in matched_characters
}
ordered_centers: list[int] = []
seen: set[int] = set()

Expand Down
Loading
Loading