From bf9af62fc0b307065c33902b7b923624cde84951 Mon Sep 17 00:00:00 2001 From: smakarim Date: Sat, 6 Jun 2026 19:49:17 +0500 Subject: [PATCH 1/4] feat(parallel): bounded worker pool FlatMap --- internal/parallel/parallel.go | 39 +++++++++++++++++++ internal/parallel/parallel_test.go | 61 ++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 internal/parallel/parallel.go create mode 100644 internal/parallel/parallel_test.go diff --git a/internal/parallel/parallel.go b/internal/parallel/parallel.go new file mode 100644 index 0000000..52f0102 --- /dev/null +++ b/internal/parallel/parallel.go @@ -0,0 +1,39 @@ +// Package parallel provides a bounded worker pool for fanning out independent work. +package parallel + +import ( + "context" + "sync" +) + +// FlatMap runs fn over items with at most `limit` concurrent workers (limit < 1 is +// treated as 1) and returns all fn results concatenated. The order of the returned +// slice is not significant; callers correlate/sort downstream. fn may return nil. +func FlatMap[T any, R any](ctx context.Context, items []T, limit int, fn func(context.Context, T) []R) []R { + if limit < 1 { + limit = 1 + } + sem := make(chan struct{}, limit) + var ( + mu sync.Mutex + wg sync.WaitGroup + out []R + ) + for _, it := range items { + it := it + sem <- struct{}{} + wg.Add(1) + go func() { + defer wg.Done() + defer func() { <-sem }() + res := fn(ctx, it) + if len(res) > 0 { + mu.Lock() + out = append(out, res...) + mu.Unlock() + } + }() + } + wg.Wait() + return out +} diff --git a/internal/parallel/parallel_test.go b/internal/parallel/parallel_test.go new file mode 100644 index 0000000..909c224 --- /dev/null +++ b/internal/parallel/parallel_test.go @@ -0,0 +1,61 @@ +package parallel + +import ( + "context" + "sync/atomic" + "testing" + "time" +) + +func TestFlatMapAggregates(t *testing.T) { + items := []int{1, 2, 3, 4} + got := FlatMap(context.Background(), items, 2, func(_ context.Context, n int) []int { + return []int{n, n * 10} + }) + if len(got) != 8 { + t.Fatalf("want 8 results, got %d", len(got)) + } + sum := 0 + for _, v := range got { + sum += v + } + if sum != 110 { + t.Fatalf("sum = %d, want 110", sum) + } +} + +func TestFlatMapRespectsLimit(t *testing.T) { + var inFlight, maxSeen int32 + items := make([]int, 40) + FlatMap(context.Background(), items, 4, func(_ context.Context, _ int) []int { + n := atomic.AddInt32(&inFlight, 1) + for { + m := atomic.LoadInt32(&maxSeen) + if n <= m || atomic.CompareAndSwapInt32(&maxSeen, m, n) { + break + } + } + time.Sleep(2 * time.Millisecond) + atomic.AddInt32(&inFlight, -1) + return nil + }) + if maxSeen > 4 { + t.Fatalf("max concurrent = %d, want <= 4", maxSeen) + } + if maxSeen < 2 { + t.Fatalf("expected real concurrency, max = %d", maxSeen) + } +} + +func TestFlatMapToleratesNilAndSerialLimit(t *testing.T) { + items := []int{1, 2, 3} + got := FlatMap(context.Background(), items, 0, func(_ context.Context, n int) []int { + if n == 2 { + return nil + } + return []int{n} + }) + if len(got) != 2 { + t.Fatalf("want 2 (nil tolerated, limit<1 => serial), got %d", len(got)) + } +} From d7a2f99a834793ec7731e28a5ddb7c0bfed3013b Mon Sep 17 00:00:00 2001 From: smakarim Date: Sat, 6 Jun 2026 19:50:30 +0500 Subject: [PATCH 2/4] feat(cli): add concurrency flag --- cmd/revenant/main.go | 3 +++ cmd/revenant/run.go | 1 + 2 files changed, 4 insertions(+) diff --git a/cmd/revenant/main.go b/cmd/revenant/main.go index 6a44b7e..806ab97 100644 --- a/cmd/revenant/main.go +++ b/cmd/revenant/main.go @@ -53,6 +53,7 @@ func main() { analyze bool domain, dorkFile string dorkMax int + concurrency int ) root := &cobra.Command{ Use: "revenant", @@ -80,6 +81,7 @@ func main() { dorkFile: dorkFile, dorkMax: dorkMax, analyze: analyze, + concurrency: concurrency, }) }, } @@ -101,6 +103,7 @@ func main() { root.Flags().StringVar(&dorkFile, "dork-file", "", "custom dork templates ({term}/{domain}); overrides built-in corpus") root.Flags().IntVar(&dorkMax, "dork-max", 200, "cap on total dork hits scanned") root.Flags().BoolVar(&analyze, "analyze", false, "enumerate capabilities of verified keys (GitHub tokens)") + root.Flags().IntVar(&concurrency, "concurrency", 8, "max repos/gists/dork-hits scanned in parallel") if err := root.Execute(); err != nil { fmt.Fprintln(os.Stderr, "error:", err) os.Exit(1) diff --git a/cmd/revenant/run.go b/cmd/revenant/run.go index 75fe0de..e1f1f40 100644 --- a/cmd/revenant/run.go +++ b/cmd/revenant/run.go @@ -37,6 +37,7 @@ type runConfig struct { dorkFile string dorkMax int analyze bool + concurrency int } // scanner is the local alias for the per-repo secret source. From a552c4daa800de6f2272ead1dac6aa74f709ec7f Mon Sep 17 00:00:00 2001 From: smakarim Date: Sat, 6 Jun 2026 19:51:33 +0500 Subject: [PATCH 3/4] feat(cli): scan repos gists and dork hits concurrently --- cmd/revenant/run.go | 51 +++++++++++++++++++++++----------------- cmd/revenant/run_test.go | 4 ++-- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/cmd/revenant/run.go b/cmd/revenant/run.go index e1f1f40..597d81d 100644 --- a/cmd/revenant/run.go +++ b/cmd/revenant/run.go @@ -14,6 +14,7 @@ import ( "github.com/smakarim/revenant/internal/githubclient" "github.com/smakarim/revenant/internal/keyintel" "github.com/smakarim/revenant/internal/model" + "github.com/smakarim/revenant/internal/parallel" "github.com/smakarim/revenant/internal/report" "github.com/smakarim/revenant/internal/scan" "github.com/smakarim/revenant/internal/target" @@ -43,28 +44,36 @@ type runConfig struct { // scanner is the local alias for the per-repo secret source. type scanner = scan.RepoScanner -// runPipeline runs repo scanners over every repo and the gist scanner over every -// gist, appends dork candidates, optionally enriches all candidates, then correlates. -func runPipeline(ctx context.Context, repos []model.RepoRef, scanners []scanner, gists []model.GistRef, gistScanner *scan.GistScanner, dorkCands []model.Candidate, enrich func(context.Context, []model.Candidate) []model.Candidate) ([]model.Finding, error) { - var all []model.Candidate - for _, repo := range repos { +// runPipeline scans every repo (and gist, and dork candidate set) with bounded +// concurrency, optionally enriches, then correlates. Output order is deterministic +// because correlate sorts by score. +func runPipeline(ctx context.Context, repos []model.RepoRef, scanners []scanner, gists []model.GistRef, gistScanner *scan.GistScanner, dorkCands []model.Candidate, enrich func(context.Context, []model.Candidate) []model.Candidate, concurrency int) ([]model.Finding, error) { + repoCands := parallel.FlatMap(ctx, repos, concurrency, func(ctx context.Context, repo model.RepoRef) []model.Candidate { + var c []model.Candidate for _, sc := range scanners { - cands, err := sc.Scan(ctx, repo) + r, err := sc.Scan(ctx, repo) if err != nil { continue } - all = append(all, cands...) + c = append(c, r...) } - } + return c + }) + + var gistCands []model.Candidate if gistScanner != nil { - for _, gist := range gists { - cands, err := gistScanner.ScanGist(ctx, gist) + gistCands = parallel.FlatMap(ctx, gists, concurrency, func(ctx context.Context, g model.GistRef) []model.Candidate { + c, err := gistScanner.ScanGist(ctx, g) if err != nil { - continue + return nil } - all = append(all, cands...) - } + return c + }) } + + all := make([]model.Candidate, 0, len(repoCands)+len(gistCands)+len(dorkCands)) + all = append(all, repoCands...) + all = append(all, gistCands...) all = append(all, dorkCands...) if enrich != nil { all = enrich(ctx, all) @@ -137,24 +146,22 @@ func dorkCandidates(ctx context.Context, cfg runConfig, client *githubclient.Cli fmt.Fprintf(os.Stderr, "revenant: dork hits capped at %d (more exist)\n", cfg.dorkMax) } - var all []model.Candidate - for _, h := range hits { + return parallel.FlatMap(ctx, hits, cfg.concurrency, func(ctx context.Context, h dork.Hit) []model.Candidate { content, err := fetcher.FetchFile(ctx, h.Repo, h.Path) if err != nil { - continue + return nil } occ := model.Occurrence{Repo: h.Repo, SHA: h.Path, Source: "dork"} cands, err := det.Detect(ctx, h.Repo, "dorkhit", content, occ) if err != nil { - continue + return nil } cands, err = validate.Verify(ctx, cands, verifier) if err != nil { - continue + return nil } - all = append(all, cands...) - } - return all + return cands + }) } // run wires production dependencies and executes the pipeline. @@ -205,7 +212,7 @@ func run(ctx context.Context, cfg runConfig) error { } } - findings, err := runPipeline(ctx, repos, scanners, gists, gistScanner, dorkCands, enrich) + findings, err := runPipeline(ctx, repos, scanners, gists, gistScanner, dorkCands, enrich, cfg.concurrency) if err != nil { return err } diff --git a/cmd/revenant/run_test.go b/cmd/revenant/run_test.go index a841804..0a6190a 100644 --- a/cmd/revenant/run_test.go +++ b/cmd/revenant/run_test.go @@ -23,7 +23,7 @@ func TestRunPipelineMergesScanners(t *testing.T) { s2 := fakeScanner{cands: []model.Candidate{{SecretType: "AWS", Raw: "AKIA1", Verified: true, Occurrence: model.Occurrence{Repo: repo, SHA: "d", Source: "deleted"}}}} - findings, err := runPipeline(context.Background(), []model.RepoRef{repo}, []scanner{s1, s2}, nil, nil, nil, nil) + findings, err := runPipeline(context.Background(), []model.RepoRef{repo}, []scanner{s1, s2}, nil, nil, nil, nil, 1) if err != nil { t.Fatal(err) } @@ -45,7 +45,7 @@ func TestRunPipelineAppliesEnrich(t *testing.T) { } return c } - findings, err := runPipeline(context.Background(), []model.RepoRef{repo}, []scanner{s}, nil, nil, nil, enrich) + findings, err := runPipeline(context.Background(), []model.RepoRef{repo}, []scanner{s}, nil, nil, nil, enrich, 1) if err != nil { t.Fatal(err) } From e5c4132a2a24e2804b26fc3e73f0bcce2a9d1652 Mon Sep 17 00:00:00 2001 From: smakarim Date: Sat, 6 Jun 2026 19:52:55 +0500 Subject: [PATCH 4/4] docs: document --concurrency and update limitations --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fd6a1b5..cc3be10 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,7 @@ Key intelligence: | `--dork-max` | Cap on total dork hits scanned (default 200) | | `--analyze` | Report capabilities of verified keys (GitHub tokens) | | `--verified-only` | Report only secrets confirmed live | +| `--concurrency` | Max repos/gists/dork-hits scanned in parallel (default 8) | | `-o, --output` | Write JSON findings to a file | ## How it works @@ -155,8 +156,9 @@ target -> discover -> fetch -> detect -> validate -> correlate -> report - Recent window. The activity log and Events API cover roughly the last 90 days and a few hundred events per repo. Older force-pushes would need a GH Archive backfill, which is not built yet (the parser exists behind an interface for it). -- Serial execution. Repos, commits, gists, and dork queries are processed one at a time, so - large targets are slow. Concurrency is planned. +- Repos, gists, and dork hits scan concurrently (default 8 workers; tune with `--concurrency`). + Brute-force probing within a single repo is still serial, since it is a rate-limited, opt-in + fallback. - Brute-force is a slow, opt-in fallback. It is rate-limited by GitHub and capped per repo. Prefer a token and the activity tier. - Live-key intelligence covers GitHub tokens. Other key types are not analyzed yet.