Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 106 additions & 16 deletions datasets/builders/build_kicad_d2_combined.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,25 @@
PERMISSIVE_TARGET = f"{HF_ORG}/kicad-d2-combined-permissive"
COPYLEFT_TARGET = f"{HF_ORG}/kicad-d2-combined-copyleft"

SOURCE_PERMISSIVE = f"{HF_ORG}/kicad9plus-permissive"
SOURCE_COPYLEFT = f"{HF_ORG}/kicad9plus-copyleft"
# Unified source corpus (chat-format JSONL with full license_spdx per row).
# The license bucket (permissive vs copyleft) is derived from metadata at load
# time, not from a separate source repo. This avoids the truncation cap that
# the per-bucket chat-format datasets (kicad9plus-permissive/copyleft) impose
# on long .kicad_sch files. We additionally filter rows where the assistant
# content is intact (≥95% of declared file_size_bytes) so kicad-cli can parse
# them — truncated rows fail "Failed to load schematic" anyway.
SOURCE_UNIFIED = "electron-rare/kicad9plus-sch-corpus"

# License → bucket mapping (Apache/MIT/CC0/EUPL/CERN-OHL-P are permissive
# re-distribution friendly; GPL/CERN-OHL-S are copyleft share-alike).
PERMISSIVE_LICENSES = {
"Apache-2.0", "MIT", "CC0-1.0", "EUPL-1.2",
"CERN-OHL-P-2.0", "BSD-3-Clause", "BSD-2-Clause", "ISC",
}
Comment on lines +69 to +72
COPYLEFT_LICENSES = {
"GPL-3.0", "GPL-3.0-or-later", "GPL-2.0", "GPL-2.0-or-later",
"CERN-OHL-S-2.0", "AGPL-3.0", "AGPL-3.0-or-later",
}

WORK_DIR = Path("/tmp/d2_build")
WORK_DIR.mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -126,26 +143,31 @@ def load_source_corpus(bucket: str, max_projects: int | None = None) -> list[dic
log.error("huggingface_hub not found; pip install huggingface-hub")
return []

source_repo = SOURCE_PERMISSIVE if bucket == "permissive" else SOURCE_COPYLEFT
cache_dir = WORK_DIR / bucket
cache_dir = WORK_DIR / "sch_corpus"
cache_dir.mkdir(parents=True, exist_ok=True)

try:
local_path = Path(snapshot_download(
repo_id=source_repo,
repo_id=SOURCE_UNIFIED,
repo_type="dataset",
local_dir=str(cache_dir),
))
Comment on lines +146 to 154
except Exception as e:
log.error("failed to download %s: %s", source_repo, e)
log.error("failed to download %s: %s", SOURCE_UNIFIED, e)
return []

dataset_jsonl = local_path / "dataset.jsonl"
if not dataset_jsonl.exists():
log.error("dataset.jsonl missing in %s", source_repo)
log.error("dataset.jsonl missing in %s", SOURCE_UNIFIED)
return []

want_licenses = (PERMISSIVE_LICENSES if bucket == "permissive"
else COPYLEFT_LICENSES)
n_seen = 0
n_wrong_bucket = 0
n_truncated = 0
projects: list[dict] = []

with open(dataset_jsonl) as f:
for line in f:
line = line.strip()
Expand All @@ -155,21 +177,38 @@ def load_source_corpus(bucket: str, max_projects: int | None = None) -> list[dic
row = json.loads(line)
except json.JSONDecodeError:
continue
n_seen += 1
msgs = row.get("messages") or []
md = row.get("metadata") or {}
license_spdx = md.get("license_spdx", "NOASSERTION")

# Filter 1: license bucket (never mix permissive/copyleft)
if license_spdx not in want_licenses:
n_wrong_bucket += 1
continue

user_prompt = next((m.get("content", "") for m in msgs
if m.get("role") == "user"), "")
sch_content = next((m.get("content", "") for m in msgs
if m.get("role") == "assistant"), "")
if not sch_content or not sch_content.lstrip().startswith("(kicad_sch"):
continue

# Filter 2: skip truncated rows — kicad-cli rejects unbalanced
# S-expressions with "Failed to load schematic"
declared = md.get("file_size_bytes", 0)
actual = len(sch_content.encode("utf-8"))
if declared > 0 and actual / declared < 0.95:
n_truncated += 1
continue

projects.append({
"project": md.get("repo", "?") + "/" + md.get("rel_path", "?"),
"source_repo": source_repo,
"source_repo": SOURCE_UNIFIED,
"source_path": md.get("rel_path", "?"),
"sch_content": sch_content,
"prompt": user_prompt,
"license_spdx": md.get("license_spdx", "NOASSERTION"),
"license_spdx": license_spdx,
"source_url": md.get("source_url", ""),
"file_sha256": md.get("file_sha256", ""),
"repo": md.get("repo", ""),
Expand All @@ -178,7 +217,8 @@ def load_source_corpus(bucket: str, max_projects: int | None = None) -> list[dic
if max_projects and len(projects) >= max_projects:
break

log.info(" loaded %d projects from %s", len(projects), source_repo)
log.info(" bucket=%s: seen=%d wrong_bucket=%d truncated=%d intact=%d",
bucket, n_seen, n_wrong_bucket, n_truncated, len(projects))
return projects


Expand Down Expand Up @@ -272,8 +312,17 @@ def run_erc_drc_for_project(project: dict) -> dict:

# ─── Step 3: programmatic noise injection ──────────────────────────────

NOISE_OPERATIONS = ["delete_wire", "displace_symbol", "drop_global_label",
"shrink_track_width"]
# Noise ops, ordered from most-specific (real designer mistakes) to most-
# universal (parser-breaking). The builder picks ops opportunistically:
# if delete_wire finds no wire to delete, it falls through to the next op.
NOISE_OPERATIONS = [
"delete_wire", # remove a (wire (pts ...)) — orphan net
"displace_symbol", # move a (symbol (at x y a)) — DRC clearance issue
"drop_global_label", # remove a (global_label "X") — unconnected net
"delete_sheet", # remove a (sheet ...) reference — missing hierarchy
"corrupt_uuid", # mutate a (uuid "...") — schematic identity broken
"truncate_tail", # cut last 5% of file — universal S-exp break
]


def inject_noise(sch_text: str, pcb_text: str | None,
Expand Down Expand Up @@ -319,7 +368,37 @@ def displace_at(m):
elif noise_op == "shrink_track_width" and pcb_text:
# In PCB, find segment with (width 0.25) and shrink to 0.05
bad_pcb = pcb_text.replace("(width 0.25)", "(width 0.05)", 1)


elif noise_op == "delete_sheet":
# Find first (sheet ... ) full block and remove it.
# KiCad sheet block: balanced parens, can be hundreds of lines.
m = re.search(r'^\s*\(sheet\b', bad_sch, re.M)
if m:
start = m.start()
depth = 0
i = bad_sch.index('(', start)
for j in range(i, len(bad_sch)):
if bad_sch[j] == '(':
depth += 1
elif bad_sch[j] == ')':
depth -= 1
if depth == 0:
bad_sch = bad_sch[:start] + bad_sch[j+1:]
break

elif noise_op == "corrupt_uuid":
# Replace first uuid value with a syntactically broken token.
bad_sch = re.sub(
r'(\(uuid\s+")([0-9a-f-]{8,})(")',
r'\1BROKEN-UUID-NOT-HEX\3',
bad_sch, count=1,
)

elif noise_op == "truncate_tail":
# Universal: cut last 5% to break the closing parens.
cut = max(1, int(len(bad_sch) * 0.05))
bad_sch = bad_sch[:-cut]

return (bad_sch, bad_pcb)


Expand All @@ -340,14 +419,25 @@ def build_triplets_from_project(project: dict, manifest_rows: list) -> list[Trip
return []
pcb_text = None # dataset has no PCB pair, see load_source_corpus()

for i in range(NOISE_VARIANTS_PER_BOARD):
noise_op = NOISE_OPERATIONS[i % len(NOISE_OPERATIONS)]
# Opportunistic loop: try every noise op available; produce a triplet for
# each op that successfully breaks ERC (exit_code != 0). Stop after
# NOISE_VARIANTS_PER_BOARD successes. Many real schematics have only some
# of the matchable primitives (e.g. a top-level hierarchy has no wires
# but has sheets), so trying all variants is the safest design.
n_success = 0
for noise_op in NOISE_OPERATIONS:
if n_success >= NOISE_VARIANTS_PER_BOARD:
break
bad_sch, _ = inject_noise(sch_text, pcb_text, noise_op, rng)
Comment on lines +422 to 431
if bad_sch == sch_text:
# The op found no primitive to mutate; try next.
continue
# Re-run ERC on the noisy version (DRC skipped, no PCB available)
bad_reports = run_erc_drc_for_project({**project, "sch_content": bad_sch})
# Skip if noise didn't actually break anything (kicad-cli still 0 errors)
if bad_reports["valid"]:
# Noise didn't break it — kicad-cli still parses. Try next op.
continue
n_success += 1

triplet = TripletRow(conversations=[
{"from": "system", "value":
Expand Down
Loading