diff --git a/datasets/builders/build_kicad_d2_combined.py b/datasets/builders/build_kicad_d2_combined.py index 0c2dfc2..c7d5fdb 100644 --- a/datasets/builders/build_kicad_d2_combined.py +++ b/datasets/builders/build_kicad_d2_combined.py @@ -55,8 +55,25 @@ PERMISSIVE_TARGET = f"{HF_ORG}/kicad-d2-combined-permissive" COPYLEFT_TARGET = f"{HF_ORG}/kicad-d2-combined-copyleft" -SOURCE_PERMISSIVE = f"{HF_ORG}/kicad9plus-permissive" -SOURCE_COPYLEFT = f"{HF_ORG}/kicad9plus-copyleft" +# Unified source corpus (chat-format JSONL with full license_spdx per row). +# The license bucket (permissive vs copyleft) is derived from metadata at load +# time, not from a separate source repo. This avoids the truncation cap that +# the per-bucket chat-format datasets (kicad9plus-permissive/copyleft) impose +# on long .kicad_sch files. We additionally filter rows where the assistant +# content is intact (≥95% of declared file_size_bytes) so kicad-cli can parse +# them — truncated rows fail "Failed to load schematic" anyway. +SOURCE_UNIFIED = "electron-rare/kicad9plus-sch-corpus" + +# License → bucket mapping (Apache/MIT/CC0/EUPL/CERN-OHL-P are permissive +# re-distribution friendly; GPL/CERN-OHL-S are copyleft share-alike). +PERMISSIVE_LICENSES = { + "Apache-2.0", "MIT", "CC0-1.0", "EUPL-1.2", + "CERN-OHL-P-2.0", "BSD-3-Clause", "BSD-2-Clause", "ISC", +} +COPYLEFT_LICENSES = { + "GPL-3.0", "GPL-3.0-or-later", "GPL-2.0", "GPL-2.0-or-later", + "CERN-OHL-S-2.0", "AGPL-3.0", "AGPL-3.0-or-later", +} WORK_DIR = Path("/tmp/d2_build") WORK_DIR.mkdir(parents=True, exist_ok=True) @@ -126,26 +143,31 @@ def load_source_corpus(bucket: str, max_projects: int | None = None) -> list[dic log.error("huggingface_hub not found; pip install huggingface-hub") return [] - source_repo = SOURCE_PERMISSIVE if bucket == "permissive" else SOURCE_COPYLEFT - cache_dir = WORK_DIR / bucket + cache_dir = WORK_DIR / "sch_corpus" cache_dir.mkdir(parents=True, exist_ok=True) try: local_path = Path(snapshot_download( - repo_id=source_repo, + repo_id=SOURCE_UNIFIED, repo_type="dataset", local_dir=str(cache_dir), )) except Exception as e: - log.error("failed to download %s: %s", source_repo, e) + log.error("failed to download %s: %s", SOURCE_UNIFIED, e) return [] dataset_jsonl = local_path / "dataset.jsonl" if not dataset_jsonl.exists(): - log.error("dataset.jsonl missing in %s", source_repo) + log.error("dataset.jsonl missing in %s", SOURCE_UNIFIED) return [] + want_licenses = (PERMISSIVE_LICENSES if bucket == "permissive" + else COPYLEFT_LICENSES) + n_seen = 0 + n_wrong_bucket = 0 + n_truncated = 0 projects: list[dict] = [] + with open(dataset_jsonl) as f: for line in f: line = line.strip() @@ -155,21 +177,38 @@ def load_source_corpus(bucket: str, max_projects: int | None = None) -> list[dic row = json.loads(line) except json.JSONDecodeError: continue + n_seen += 1 msgs = row.get("messages") or [] md = row.get("metadata") or {} + license_spdx = md.get("license_spdx", "NOASSERTION") + + # Filter 1: license bucket (never mix permissive/copyleft) + if license_spdx not in want_licenses: + n_wrong_bucket += 1 + continue + user_prompt = next((m.get("content", "") for m in msgs if m.get("role") == "user"), "") sch_content = next((m.get("content", "") for m in msgs if m.get("role") == "assistant"), "") if not sch_content or not sch_content.lstrip().startswith("(kicad_sch"): continue + + # Filter 2: skip truncated rows — kicad-cli rejects unbalanced + # S-expressions with "Failed to load schematic" + declared = md.get("file_size_bytes", 0) + actual = len(sch_content.encode("utf-8")) + if declared > 0 and actual / declared < 0.95: + n_truncated += 1 + continue + projects.append({ "project": md.get("repo", "?") + "/" + md.get("rel_path", "?"), - "source_repo": source_repo, + "source_repo": SOURCE_UNIFIED, "source_path": md.get("rel_path", "?"), "sch_content": sch_content, "prompt": user_prompt, - "license_spdx": md.get("license_spdx", "NOASSERTION"), + "license_spdx": license_spdx, "source_url": md.get("source_url", ""), "file_sha256": md.get("file_sha256", ""), "repo": md.get("repo", ""), @@ -178,7 +217,8 @@ def load_source_corpus(bucket: str, max_projects: int | None = None) -> list[dic if max_projects and len(projects) >= max_projects: break - log.info(" loaded %d projects from %s", len(projects), source_repo) + log.info(" bucket=%s: seen=%d wrong_bucket=%d truncated=%d intact=%d", + bucket, n_seen, n_wrong_bucket, n_truncated, len(projects)) return projects @@ -272,8 +312,17 @@ def run_erc_drc_for_project(project: dict) -> dict: # ─── Step 3: programmatic noise injection ────────────────────────────── -NOISE_OPERATIONS = ["delete_wire", "displace_symbol", "drop_global_label", - "shrink_track_width"] +# Noise ops, ordered from most-specific (real designer mistakes) to most- +# universal (parser-breaking). The builder picks ops opportunistically: +# if delete_wire finds no wire to delete, it falls through to the next op. +NOISE_OPERATIONS = [ + "delete_wire", # remove a (wire (pts ...)) — orphan net + "displace_symbol", # move a (symbol (at x y a)) — DRC clearance issue + "drop_global_label", # remove a (global_label "X") — unconnected net + "delete_sheet", # remove a (sheet ...) reference — missing hierarchy + "corrupt_uuid", # mutate a (uuid "...") — schematic identity broken + "truncate_tail", # cut last 5% of file — universal S-exp break +] def inject_noise(sch_text: str, pcb_text: str | None, @@ -319,7 +368,37 @@ def displace_at(m): elif noise_op == "shrink_track_width" and pcb_text: # In PCB, find segment with (width 0.25) and shrink to 0.05 bad_pcb = pcb_text.replace("(width 0.25)", "(width 0.05)", 1) - + + elif noise_op == "delete_sheet": + # Find first (sheet ... ) full block and remove it. + # KiCad sheet block: balanced parens, can be hundreds of lines. + m = re.search(r'^\s*\(sheet\b', bad_sch, re.M) + if m: + start = m.start() + depth = 0 + i = bad_sch.index('(', start) + for j in range(i, len(bad_sch)): + if bad_sch[j] == '(': + depth += 1 + elif bad_sch[j] == ')': + depth -= 1 + if depth == 0: + bad_sch = bad_sch[:start] + bad_sch[j+1:] + break + + elif noise_op == "corrupt_uuid": + # Replace first uuid value with a syntactically broken token. + bad_sch = re.sub( + r'(\(uuid\s+")([0-9a-f-]{8,})(")', + r'\1BROKEN-UUID-NOT-HEX\3', + bad_sch, count=1, + ) + + elif noise_op == "truncate_tail": + # Universal: cut last 5% to break the closing parens. + cut = max(1, int(len(bad_sch) * 0.05)) + bad_sch = bad_sch[:-cut] + return (bad_sch, bad_pcb) @@ -340,14 +419,25 @@ def build_triplets_from_project(project: dict, manifest_rows: list) -> list[Trip return [] pcb_text = None # dataset has no PCB pair, see load_source_corpus() - for i in range(NOISE_VARIANTS_PER_BOARD): - noise_op = NOISE_OPERATIONS[i % len(NOISE_OPERATIONS)] + # Opportunistic loop: try every noise op available; produce a triplet for + # each op that successfully breaks ERC (exit_code != 0). Stop after + # NOISE_VARIANTS_PER_BOARD successes. Many real schematics have only some + # of the matchable primitives (e.g. a top-level hierarchy has no wires + # but has sheets), so trying all variants is the safest design. + n_success = 0 + for noise_op in NOISE_OPERATIONS: + if n_success >= NOISE_VARIANTS_PER_BOARD: + break bad_sch, _ = inject_noise(sch_text, pcb_text, noise_op, rng) + if bad_sch == sch_text: + # The op found no primitive to mutate; try next. + continue # Re-run ERC on the noisy version (DRC skipped, no PCB available) bad_reports = run_erc_drc_for_project({**project, "sch_content": bad_sch}) - # Skip if noise didn't actually break anything (kicad-cli still 0 errors) if bad_reports["valid"]: + # Noise didn't break it — kicad-cli still parses. Try next op. continue + n_success += 1 triplet = TripletRow(conversations=[ {"from": "system", "value":