From 12ff1c2f4617ba51da1a8e9bd7f987bec344b228 Mon Sep 17 00:00:00 2001 From: Ryo Matsunari Date: Wed, 15 Apr 2026 08:37:55 +0900 Subject: [PATCH] [codex] fix: escape yaml frontmatter and obsidian tags --- graphify/export.py | 28 +++++++++++++++++++++------- graphify/ingest.py | 9 +++++++-- tests/test_export.py | 28 +++++++++++++++++++++++++++- tests/test_ingest.py | 16 +++++++++++++--- 4 files changed, 68 insertions(+), 13 deletions(-) diff --git a/graphify/export.py b/graphify/export.py index 033ec66d..e9eaa34e 100644 --- a/graphify/export.py +++ b/graphify/export.py @@ -11,6 +11,20 @@ from graphify.security import sanitize_label from graphify.analyze import _node_community_map + +def _yaml_scalar(value: str) -> str: + """Render a string as a YAML-safe quoted scalar.""" + return json.dumps(value, ensure_ascii=False) + + +def _community_slug(label: str) -> str: + """Stable slug for tags / Dataview queries / graph color groups.""" + cleaned = _strip_diacritics(label) + cleaned = re.sub(r"[^A-Za-z0-9_-]+", "_", cleaned) + cleaned = re.sub(r"_+", "_", cleaned).strip("_") + return cleaned or "community" + + def _strip_diacritics(text: str) -> str: import unicodedata nfkd = unicodedata.normalize("NFKD", text) @@ -528,7 +542,7 @@ def _dominant_confidence(node_id: str) -> str: ftype_tag = _FTYPE_TAG.get(ftype, f"graphify/{ftype}" if ftype else "graphify/document") dom_conf = _dominant_confidence(node_id) conf_tag = f"graphify/{dom_conf}" - comm_tag = f"community/{community_name.replace(' ', '_')}" + comm_tag = f"community/{_community_slug(community_name)}" node_tags = [ftype_tag, conf_tag, comm_tag] lines: list[str] = [] @@ -536,12 +550,12 @@ def _dominant_confidence(node_id: str) -> str: # YAML frontmatter - readable in Obsidian's properties panel lines += [ "---", - f'source_file: "{data.get("source_file", "")}"', - f'type: "{ftype}"', - f'community: "{community_name}"', + f"source_file: {_yaml_scalar(data.get('source_file', ''))}", + f"type: {_yaml_scalar(ftype)}", + f"community: {_yaml_scalar(community_name)}", ] if data.get("source_location"): - lines.append(f'location: "{data["source_location"]}"') + lines.append(f"location: {_yaml_scalar(data['source_location'])}") # Add tags list to frontmatter lines.append("tags:") for tag in node_tags: @@ -640,7 +654,7 @@ def _community_reach(node_id: str) -> int: lines.append("") # Dataview live query (improvement 2) - comm_tag_name = community_name.replace(" ", "_") + comm_tag_name = _community_slug(community_name) lines.append("## Live Query (requires Dataview plugin)") lines.append("") lines.append("```dataview") @@ -691,7 +705,7 @@ def _community_reach(node_id: str) -> int: graph_config = { "colorGroups": [ { - "query": f"tag:#community/{label.replace(' ', '_')}", + "query": f"tag:#community/{_community_slug(label)}", "color": {"a": 1, "rgb": int(COMMUNITY_COLORS[cid % len(COMMUNITY_COLORS)].lstrip('#'), 16)} } for cid, label in sorted((community_labels or {}).items()) diff --git a/graphify/ingest.py b/graphify/ingest.py index 62d8386b..09c8ec41 100644 --- a/graphify/ingest.py +++ b/graphify/ingest.py @@ -15,6 +15,11 @@ def _yaml_str(s: str) -> str: return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ").replace("\r", " ") +def _yaml_list_items(values: list[str]) -> list[str]: + """Render YAML list items using the same escaping as scalar fields.""" + return [f' - "{_yaml_str(value)}"' for value in values] + + def _safe_filename(url: str, suffix: str) -> str: """Turn a URL into a safe filename.""" parsed = urllib.parse.urlparse(url) @@ -263,8 +268,8 @@ def save_query_result( 'contributor: "graphify"', ] if source_nodes: - nodes_str = ", ".join(f'"{n}"' for n in source_nodes[:10]) - frontmatter_lines.append(f"source_nodes: [{nodes_str}]") + frontmatter_lines.append("source_nodes:") + frontmatter_lines.extend(_yaml_list_items(source_nodes[:10])) frontmatter_lines.append("---") body_lines = [ diff --git a/tests/test_export.py b/tests/test_export.py index 6f4421df..fd8aea76 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -1,9 +1,10 @@ import json import tempfile from pathlib import Path +import networkx as nx from graphify.build import build_from_json from graphify.cluster import cluster -from graphify.export import to_json, to_cypher, to_graphml, to_html +from graphify.export import to_json, to_cypher, to_graphml, to_html, to_obsidian FIXTURES = Path(__file__).parent / "fixtures" @@ -125,3 +126,28 @@ def test_to_html_contains_nodes_and_edges(): content = out.read_text() assert "RAW_NODES" in content assert "RAW_EDGES" in content + + +def test_to_obsidian_escapes_yaml_and_slugs_community_tags(tmp_path): + G = nx.Graph() + G.add_node( + "n1", + label='bad"label', + source_file='src/evil"file.py', + file_type="code", + source_location="L1", + ) + communities = {0: ["n1"]} + labels = {0: 'Comm"One / Beta'} + + to_obsidian(G, communities, str(tmp_path), community_labels=labels) + + node_note = (tmp_path / "badlabel.md").read_text(encoding="utf-8") + community_note = (tmp_path / "_COMMUNITY_CommOne Beta.md").read_text(encoding="utf-8") + graph_conf = json.loads((tmp_path / ".obsidian" / "graph.json").read_text(encoding="utf-8")) + + assert 'source_file: "src/evil\\"file.py"' in node_note + assert 'community: "Comm\\"One / Beta"' in node_note + assert "#community/Comm_One_Beta" in node_note + assert "TABLE source_file, type FROM #community/Comm_One_Beta" in community_note + assert graph_conf["colorGroups"][0]["query"] == "tag:#community/Comm_One_Beta" diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 41128eee..d154f656 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -48,9 +48,10 @@ def test_source_nodes_capped_at_10(tmp_path): nodes = [f"Node{i}" for i in range(20)] out = save_query_result("q", "a", mem, source_nodes=nodes) content = out.read_text() - # Only first 10 should appear in frontmatter source_nodes line - fm_line = [l for l in content.splitlines() if l.startswith("source_nodes:")][0] - assert fm_line.count('"Node') == 10 + lines = content.splitlines() + start = lines.index("source_nodes:") + list_items = [line for line in lines[start + 1:start + 11] if line.startswith(' - "Node')] + assert len(list_items) == 10 def test_memory_dir_created(tmp_path): @@ -66,3 +67,12 @@ def test_answer_in_body(tmp_path): out = save_query_result("what is the answer?", answer, mem) content = out.read_text() assert answer in content + + +def test_source_nodes_are_yaml_escaped(tmp_path): + mem = tmp_path / "memory" + out = save_query_result("q", "a", mem, source_nodes=['node"x', "line\nbreak"]) + content = out.read_text() + assert 'source_nodes:' in content + assert ' - "node\\"x"' in content + assert ' - "line break"' in content