Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/cosift/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,14 @@ func run(cfgPath string) error {
if err := runDomainAudit(ctx, flag.Args()[1:]); err != nil {
return fmt.Errorf("domain-audit: %w", err)
}
case "purge-adult":
if err := runPurgeAdult(ctx, flag.Args()[1:]); err != nil {
return fmt.Errorf("purge-adult: %w", err)
}
case "purge-domain":
if err := runPurgeDomain(ctx, flag.Args()[1:]); err != nil {
return fmt.Errorf("purge-domain: %w", err)
}
case "verify":
if err := runVerifyPebble(ctx, cfg, flag.Args()[1:]); err != nil {
return fmt.Errorf("verify: %w", err)
Expand Down
431 changes: 382 additions & 49 deletions cmd/cosift/pebble_serve.go

Large diffs are not rendered by default.

157 changes: 157 additions & 0 deletions cmd/cosift/purge_adult.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"sort"

"github.com/pilot-protocol/cosift/internal/adultfilter"
"github.com/pilot-protocol/cosift/internal/store"
)

// runPurgeAdult sweeps an offline PebbleStore, classifies every document with
// the adultfilter (host + lexical signals over URL + title, optionally body),
// and soft-deletes the adult ones so they vanish from retrieval.
//
// DRY RUN BY DEFAULT. Without -apply it only counts and reports — including a
// histogram of the TLDs and hosts that dominate the matches, so an operator
// can see exactly what the filter would remove (and spot any TLD worth adding
// to the classifier's blocklist) before committing. Pass -apply to delete.
//
// Soft delete (store.SoftDeleteDocument) leaves the inverted-index postings as
// orphans — harmless, since retrieval skips any docID whose meta is gone — so
// the sweep is a handful of point-deletes per doc rather than a full index
// rewrite. That is what makes it tractable across a multi-million-doc corpus.
//
// cosift purge-adult -dir /data/pebble # dry run + report
// cosift purge-adult -dir /data/pebble -apply # delete (URL+title)
// cosift purge-adult -dir /data/pebble -deep -apply # also scan body text
func runPurgeAdult(ctx context.Context, args []string) error {
fs := flag.NewFlagSet("purge-adult", flag.ExitOnError)
dir := fs.String("dir", "", "PebbleStore directory (required; same dir as pebble-serve -dir)")
apply := fs.Bool("apply", false, "actually soft-delete matches (default: dry run, report only)")
deep := fs.Bool("deep", false, "fetch full body text per doc for lexical scan (slower, higher recall)")
limit := fs.Int("limit", 0, "stop after deleting this many docs (0 = no limit)")
topHosts := fs.Int("top-hosts", 25, "how many top offending hosts/TLDs to print in the report")
readonly := fs.Bool("readonly", false, "open the store read-only (no lock) — runs alongside a live pebble-serve; forces dry run")
if err := fs.Parse(args); err != nil {
return err
}
if *dir == "" {
return fmt.Errorf("-dir required")
}
if *readonly && *apply {
return fmt.Errorf("-readonly cannot be combined with -apply (read-only opens take no write lock)")
}

var ps *store.PebbleStore
var err error
if *readonly {
ps, err = store.OpenPebbleReadOnly(*dir)
} else {
ps, err = store.OpenPebble(*dir)
}
if err != nil {
return fmt.Errorf("open store: %w", err)
}
defer ps.Close()

_, before, _ := ps.CorpusStats(ctx)
mode := "DRY RUN (no deletes)"
if *apply {
mode = "APPLY (soft-deleting matches)"
}
fmt.Fprintf(os.Stderr, "purge-adult: %s — scanning %d docs (deep=%v)\n", mode, before, *deep)

var scanned, matched, deleted int64
tldHist := map[string]int64{}
hostHist := map[string]int64{}
var samples []string

err = ps.IterDocMeta(ctx, func(docID int64, url, title string) error {
scanned++
if scanned%500_000 == 0 {
fmt.Fprintf(os.Stderr, "purge-adult: scanned %d, matched %d, deleted %d\n", scanned, matched, deleted)
}

body := ""
if *deep {
if d, e := ps.GetDocByID(ctx, docID); e == nil && d != nil {
body = d.Text
}
}
adult, score, reason := adultfilter.Classify(title, body, url)
if !adult {
return nil
}
matched++
host := hostFromURL(url)
hostHist[host]++
tldHist["."+tldOfHost(host)]++
if len(samples) < 20 {
samples = append(samples, fmt.Sprintf("[score=%d %s] %s", score, reason, url))
}

if *apply {
ok, derr := ps.SoftDeleteDocument(ctx, docID, url)
if derr != nil {
return fmt.Errorf("delete doc %d: %w", docID, derr)
}
if ok {
deleted++
}
if *limit > 0 && deleted >= int64(*limit) {
return errStopSweep
}
}
return nil
})
if err != nil && err != errStopSweep {
return fmt.Errorf("sweep: %w", err)
}

_, after, _ := ps.CorpusStats(ctx)
fmt.Fprintf(os.Stderr, "\npurge-adult: done — scanned=%d matched=%d deleted=%d\n", scanned, matched, deleted)
fmt.Fprintf(os.Stderr, "purge-adult: corpus indexed_docs %d → %d\n", before, after)

printHist(os.Stderr, "top offending TLDs", tldHist, *topHosts)
printHist(os.Stderr, "top offending hosts", hostHist, *topHosts)
if len(samples) > 0 {
fmt.Fprintln(os.Stderr, "\nsample matches:")
for _, s := range samples {
fmt.Fprintln(os.Stderr, " "+s)
}
}
if !*apply && matched > 0 {
fmt.Fprintf(os.Stderr, "\npurge-adult: DRY RUN — re-run with -apply to soft-delete the %d matched docs.\n", matched)
}
return nil
}

// errStopSweep is the sentinel returned from the IterDocMeta callback to stop
// early once -limit deletes have been made (not a real error).
var errStopSweep = fmt.Errorf("purge-adult: stop sweep")

func printHist(w *os.File, label string, h map[string]int64, top int) {
if len(h) == 0 {
return
}
type kv struct {
k string
v int64
}
rows := make([]kv, 0, len(h))
for k, v := range h {
rows = append(rows, kv{k, v})
}
sort.Slice(rows, func(i, j int) bool { return rows[i].v > rows[j].v })
if top > 0 && len(rows) > top {
rows = rows[:top]
}
fmt.Fprintf(w, "\n%s:\n", label)
for _, r := range rows {
fmt.Fprintf(w, " %8d %s\n", r.v, r.k)
}
}
124 changes: 124 additions & 0 deletions cmd/cosift/purge_domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"context"
"flag"
"fmt"
"os"

"github.com/pilot-protocol/cosift/internal/store"
)

// runPurgeDomain sweeps an offline PebbleStore and soft-deletes every document
// whose host matches one of the given domain/TLD suffixes (dot-boundary), e.g.
// -suffix cfd,sbs removes every *.cfd and *.sbs page regardless of content.
//
// Companion to the crawler's exclude_domains blacklist: the blacklist stops
// NEW pages from those TLDs being crawled, this clears the backlog already
// indexed. Unlike purge-adult (which only removes pages that ALSO trip the
// adult classifier), this is a pure host-suffix sweep.
//
// DRY RUN BY DEFAULT. -apply soft-deletes (store.SoftDeleteDocument), leaving
// inverted-index postings as harmless orphans (retrieval skips any docID whose
// meta is gone), so it's a few point-deletes per doc rather than a full index
// rewrite — tractable across a multi-million-doc corpus. After purging a large
// fraction, run a compaction to reclaim disk and correct IDF.
//
// cosift purge-domain -dir /data/pebble -suffix cfd,sbs # dry run + report
// cosift purge-domain -dir /data/pebble -suffix cfd,sbs -apply # delete
// cosift purge-domain -dir /data/pebble -suffix cfd,sbs -readonly # dry run alongside a live serve
func runPurgeDomain(ctx context.Context, args []string) error {
fs := flag.NewFlagSet("purge-domain", flag.ExitOnError)
dir := fs.String("dir", "", "PebbleStore directory (required; same dir as pebble-serve -dir)")
suffixCSV := fs.String("suffix", "", "CSV of host/TLD suffixes to purge, dot-boundary match (e.g. cfd,sbs)")
apply := fs.Bool("apply", false, "actually soft-delete matches (default: dry run, report only)")
limit := fs.Int("limit", 0, "stop after deleting this many docs (0 = no limit)")
topHosts := fs.Int("top-hosts", 25, "how many top matched hosts/TLDs to print in the report")
readonly := fs.Bool("readonly", false, "open the store read-only (no lock) — runs alongside a live pebble-serve; forces dry run")
if err := fs.Parse(args); err != nil {
return err
}
if *dir == "" {
return fmt.Errorf("-dir required")
}
suffixes := splitDomainsCSV(*suffixCSV)
if len(suffixes) == 0 {
return fmt.Errorf("-suffix required (e.g. -suffix cfd,sbs)")
}
if *readonly && *apply {
return fmt.Errorf("-readonly cannot be combined with -apply (read-only opens take no write lock)")
}

var ps *store.PebbleStore
var err error
if *readonly {
ps, err = store.OpenPebbleReadOnly(*dir)
} else {
ps, err = store.OpenPebble(*dir)
}
if err != nil {
return fmt.Errorf("open store: %w", err)
}
defer ps.Close()

_, before, _ := ps.CorpusStats(ctx)
mode := "DRY RUN (no deletes)"
if *apply {
mode = "APPLY (soft-deleting matches)"
}
fmt.Fprintf(os.Stderr, "purge-domain: %s — scanning %d docs for suffixes %v\n", mode, before, suffixes)

var scanned, matched, deleted int64
tldHist := map[string]int64{}
hostHist := map[string]int64{}
var samples []string

err = ps.IterDocMeta(ctx, func(docID int64, url, title string) error {
scanned++
if scanned%500_000 == 0 {
fmt.Fprintf(os.Stderr, "purge-domain: scanned %d, matched %d, deleted %d\n", scanned, matched, deleted)
}
host := hostFromURL(url)
if !matchesAnyDomain(host, suffixes) {
return nil
}
matched++
hostHist[host]++
tldHist["."+tldOfHost(host)]++
if len(samples) < 20 {
samples = append(samples, url)
}
if *apply {
ok, derr := ps.SoftDeleteDocument(ctx, docID, url)
if derr != nil {
return fmt.Errorf("delete doc %d: %w", docID, derr)
}
if ok {
deleted++
}
if *limit > 0 && deleted >= int64(*limit) {
return errStopSweep
}
}
return nil
})
if err != nil && err != errStopSweep {
return fmt.Errorf("sweep: %w", err)
}

_, after, _ := ps.CorpusStats(ctx)
fmt.Fprintf(os.Stderr, "\npurge-domain: done — scanned=%d matched=%d deleted=%d\n", scanned, matched, deleted)
fmt.Fprintf(os.Stderr, "purge-domain: corpus indexed_docs %d → %d\n", before, after)
printHist(os.Stderr, "top matched TLDs", tldHist, *topHosts)
printHist(os.Stderr, "top matched hosts", hostHist, *topHosts)
if len(samples) > 0 {
fmt.Fprintln(os.Stderr, "\nsample matches:")
for _, s := range samples {
fmt.Fprintln(os.Stderr, " "+s)
}
}
if !*apply && matched > 0 {
fmt.Fprintf(os.Stderr, "\npurge-domain: DRY RUN — re-run with -apply to soft-delete the %d matched docs.\n", matched)
}
return nil
}
98 changes: 98 additions & 0 deletions cmd/cosift/purge_domain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package main

import (
"context"
"path/filepath"
"testing"
"time"

"github.com/pilot-protocol/cosift/internal/index"
"github.com/pilot-protocol/cosift/internal/store"
)

// TestRunPurgeDomain verifies the host-suffix sweep soft-deletes exactly the
// matching-TLD docs (dot-boundary) and leaves the rest, and that the default
// dry run deletes nothing.
func TestRunPurgeDomain(t *testing.T) {
ctx := context.Background()
dir := filepath.Join(t.TempDir(), "pebble")
ps, err := store.OpenPebble(dir)
if err != nil {
t.Fatalf("OpenPebble: %v", err)
}
idx := index.NewPebbleBM25(ps)

docs := []struct{ url, title, text string }{
{"https://spam1.cfd/a", "Spam A", "junk body one"},
{"https://x.spam.cfd/b", "Spam B", "junk body two"}, // subdomain of a .cfd host
{"https://gamble.sbs/c", "Bet C", "junk body three"}, // .sbs
{"https://good.com/d", "Good D", "real useful content"},
{"https://docs.example.org/e", "Docs E", "reference material"},
{"https://notcfd.com/f", "Edge F", "ends in cfd-ish but is .com"}, // must NOT match
}
for _, d := range docs {
id, err := ps.UpsertDocument(ctx, &store.Document{URL: d.url, Title: d.title, Text: d.text, FetchedAt: time.Now()})
if err != nil {
t.Fatalf("UpsertDocument %s: %v", d.url, err)
}
if err := idx.IndexDocument(ctx, id, d.title, d.text); err != nil {
t.Fatalf("IndexDocument %s: %v", d.url, err)
}
}
ps.Close() // release the write lock so runPurgeDomain can open the dir

// Dry run: must delete nothing.
if err := runPurgeDomain(ctx, []string{"-dir", dir, "-suffix", "cfd,sbs"}); err != nil {
t.Fatalf("dry run: %v", err)
}
assertDocs(t, dir, map[string]bool{
"https://spam1.cfd/a": true, "https://gamble.sbs/c": true, "https://good.com/d": true,
})

// Apply: purge *.cfd and *.sbs.
if err := runPurgeDomain(ctx, []string{"-dir", dir, "-suffix", "cfd,sbs", "-apply"}); err != nil {
t.Fatalf("apply: %v", err)
}
assertDocs(t, dir, map[string]bool{
"https://spam1.cfd/a": false, // purged
"https://x.spam.cfd/b": false, // purged (subdomain)
"https://gamble.sbs/c": false, // purged
"https://good.com/d": true, // kept
"https://docs.example.org/e": true, // kept
"https://notcfd.com/f": true, // kept (dot-boundary: not a .cfd)
})
}

// assertDocs opens the store read-only and checks presence/absence per URL.
func assertDocs(t *testing.T, dir string, want map[string]bool) {
t.Helper()
ps, err := store.OpenPebble(dir)
if err != nil {
t.Fatalf("reopen: %v", err)
}
defer ps.Close()
for u, shouldExist := range want {
d, _ := ps.GetDocByURL(context.Background(), u)
if shouldExist && d == nil {
t.Errorf("%s: expected present, got deleted", u)
}
if !shouldExist && d != nil {
t.Errorf("%s: expected purged, still present", u)
}
}
}

func TestRunPurgeDomainRequiresSuffix(t *testing.T) {
dir := filepath.Join(t.TempDir(), "pebble")
ps, err := store.OpenPebble(dir)
if err != nil {
t.Fatalf("OpenPebble: %v", err)
}
ps.Close()
if err := runPurgeDomain(context.Background(), []string{"-dir", dir}); err == nil {
t.Error("expected error when -suffix is empty")
}
if err := runPurgeDomain(context.Background(), []string{"-dir", dir, "-suffix", "cfd", "-apply", "-readonly"}); err == nil {
t.Error("expected error when -apply combined with -readonly")
}
}
Loading
Loading