Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 110 additions & 3 deletions roadmap-planner/backend/internal/contributions/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package contributions
import (
"context"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -271,9 +272,88 @@ func (a *Aggregator) Rebuild(ctx context.Context, from, to time.Time) error {
return fmt.Errorf("aggregate jira: %w", err)
}

// TODO(B3): review_latency_p50_hours — derive from
// pull_requests.first_review_at - pull_requests.created_at, p50 by
// (reviewer_id, week). p50 across reviews per author per week.
// Review latency p50, by reviewer × week (W8 2026-05-19).
//
// For each non-bot review row that matches `pr.first_human_review_at`
// (i.e., the first *human* touch on the PR), we record the latency
// in hours from the PR's `created_at`. The week is the week the PR
// was opened — the metric answers "when reviewer X picks up a PR,
// how long after creation does that first response take?" rather
// than "how long does the review process last", which keeps the
// number comparable across reviewers regardless of how long the PR
// sat before they happened to look at it.
//
// p50 is computed in Go (portable across SQLite + Postgres). The
// dataset per (reviewer, week) is small (single-digit reviews for
// most engineers per week), so the in-memory sort is cheap.
latSQL := rebind(fmt.Sprintf(`
SELECT
rv.reviewer_id AS member_id,
%s AS week_start,
%s AS latency_hours
FROM pr_reviews rv
JOIN pull_requests pr ON pr.id = rv.pr_id
WHERE rv.is_bot = 0
AND rv.reviewer_id IS NOT NULL
AND rv.reviewer_id <> ''
AND pr.first_human_review_at IS NOT NULL
AND rv.submitted_at = pr.first_human_review_at
AND pr.created_at >= ?
AND pr.created_at < ?`,
dialect.WeekStart("pr.created_at"),
hoursBetween(dialect, "pr.first_human_review_at", "pr.created_at"),
))
latArgs := []any{from, to}
latRows, err := db.QueryContext(ctx, latSQL, latArgs...)
if err != nil {
return fmt.Errorf("aggregate review latency: %w", err)
}
type latKey struct {
member string
week time.Time
}
pools := map[latKey][]float64{}
for latRows.Next() {
var memberID string
var weekStartRaw any
var hours float64
if err := latRows.Scan(&memberID, &weekStartRaw, &hours); err != nil {
latRows.Close()
return fmt.Errorf("scan review latency: %w", err)
}
weekStart, err := scanTimeAny(weekStartRaw)
if err != nil {
latRows.Close()
return fmt.Errorf("parse review-latency week_start: %w", err)
}
if hours < 0 {
continue
}
pools[latKey{memberID, weekStart}] = append(pools[latKey{memberID, weekStart}], hours)
}
latRows.Close()
if err := latRows.Err(); err != nil {
return fmt.Errorf("iterate review latency: %w", err)
}
if len(pools) > 0 {
upsertSQL := rebind(`
INSERT INTO member_week_metrics (member_id, week_start, pillar_id, component, review_latency_p50_hours)
VALUES (?, ?, '', '', ?)
ON CONFLICT(member_id, week_start, pillar_id, component) DO UPDATE SET
review_latency_p50_hours = excluded.review_latency_p50_hours`)
for k, vs := range pools {
sort.Float64s(vs)
p50 := round1(percentile(vs, 0.50))
// Bind week_start as the same `YYYY-MM-DD` string the other
// INSERTs in Rebuild emit via `dialect.WeekStart(...)`, so the
// composite primary key matches and the ON CONFLICT branch
// fires instead of creating a parallel row.
weekStr := k.week.UTC().Format("2006-01-02")
if _, err := db.ExecContext(ctx, upsertSQL, k.member, weekStr, p50); err != nil {
return fmt.Errorf("upsert review latency for %s/%s: %w", k.member, k.week, err)
}
}
}

// W1 cleanup: every rebuild leaves the four INSERTs above blocked
// on the allowlist, but historical rollup rows for now-denied
Expand Down Expand Up @@ -363,6 +443,33 @@ func (a *Aggregator) RebuildRecent(ctx context.Context, days int) error {
return a.Rebuild(ctx, MondayOf(now.AddDate(0, 0, -days)), MondayOf(now.AddDate(0, 0, 7)))
}

// scanTimeAny converts a driver value emitted by `dialect.WeekStart(...)`
// to a time.Time. SQLite returns the strftime result as TEXT
// ("YYYY-MM-DD"); Postgres returns date_trunc as time.Time directly.
// Empty / nil inputs produce the zero time without erroring.
func scanTimeAny(v any) (time.Time, error) {
switch t := v.(type) {
case nil:
return time.Time{}, nil
case time.Time:
return t.UTC(), nil
case string:
return parseDateString(t)
case []byte:
return parseDateString(string(t))
}
return time.Time{}, fmt.Errorf("unsupported time scan type %T", v)
}

func parseDateString(s string) (time.Time, error) {
for _, layout := range []string{"2006-01-02", "2006-01-02 15:04:05", time.RFC3339, time.RFC3339Nano} {
if t, err := time.Parse(layout, s); err == nil {
return t.UTC(), nil
}
}
return time.Time{}, fmt.Errorf("week_start %q: no layout matched", s)
}

// MondayOf returns the Monday 00:00 UTC of t's week.
func MondayOf(t time.Time) time.Time {
wd := int(t.Weekday())
Expand Down
106 changes: 106 additions & 0 deletions roadmap-planner/backend/internal/contributions/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,109 @@ func TestAggregatorAllowlist(t *testing.T) {
}
}
}

// TestAggregatorReviewLatency exercises the W8 (2026-05-19) per-reviewer
// p50 of `first_human_review_at - created_at`.
//
// Seeds three PRs created in the same week, all reviewed by `alice` as
// the first human reviewer. Latencies (in hours): 1, 4, 9 — p50 = 4.
// One bot review is also seeded to confirm it's ignored (is_bot = 1).
func TestAggregatorReviewLatency(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
store, err := storage.OpenSQLite(filepath.Join(dir, "lat.db"))
if err != nil {
t.Fatalf("open: %v", err)
}
t.Cleanup(func() { _ = store.Close() })
if err := store.Migrate(ctx); err != nil {
t.Fatalf("migrate: %v", err)
}
for _, m := range []storage.Member{
{ID: "alice", DisplayName: "Alice", GitHubLogin: "alice", Active: true},
{ID: "bot", DisplayName: "Bot", Active: true},
} {
if err := store.UpsertMember(ctx, m); err != nil {
t.Fatalf("upsert %s: %v", m.ID, err)
}
}

now := time.Now().UTC().Truncate(time.Hour)
created := MondayOf(now).Add(36 * time.Hour)
week := MondayOf(created)
prs := []storage.PullRequest{}
reviews := []storage.PRReview{}
for i, latHours := range []int{1, 4, 9} {
prID := "alaudadevops/x#" + itoa(i+1)
first := created.Add(time.Duration(latHours) * time.Hour)
prs = append(prs, storage.PullRequest{
ID: prID, Source: "github", RepoID: "alaudadevops/x", Number: i + 1,
Title: "PR", State: "merged",
AuthorLogin: "danielfbm",
CreatedAt: created,
FirstReviewAt: &first,
FirstHumanReviewAt: &first,
FetchedAt: created,
})
reviews = append(reviews, storage.PRReview{
ID: prID + "/r1", PRID: prID, Source: "github",
ReviewerID: "alice", ReviewerLogin: "alice", State: "approved",
SubmittedAt: first,
})
}
// Bot review on the first PR — should be ignored.
reviews = append(reviews, storage.PRReview{
ID: "alaudadevops/x#1/r-bot", PRID: "alaudadevops/x#1", Source: "github",
ReviewerID: "bot", ReviewerLogin: "renovate", State: "commented",
SubmittedAt: created.Add(30 * time.Minute), IsBot: true,
})
if err := store.UpsertPullRequests(ctx, prs); err != nil {
t.Fatalf("upsert prs: %v", err)
}
if err := store.UpsertPRReviews(ctx, reviews); err != nil {
t.Fatalf("upsert reviews: %v", err)
}

agg := NewAggregator(store)
if err := agg.Rebuild(ctx, week.Add(-7*24*time.Hour), week.Add(14*24*time.Hour)); err != nil {
t.Fatalf("rebuild: %v", err)
}

rows, err := store.MemberWeekMetrics(ctx, storage.MemberWeekQuery{
From: week.Add(-7 * 24 * time.Hour),
To: week.Add(14 * 24 * time.Hour),
MemberIDs: []string{"alice"},
})
if err != nil {
t.Fatalf("read: %v", err)
}
if len(rows) != 1 {
t.Fatalf("expected 1 row, got %d: %+v", len(rows), rows)
}
r := rows[0]
if r.ReviewLatencyP50Hours == nil {
t.Fatalf("review_latency_p50_hours nil; want 4.0")
}
if *r.ReviewLatencyP50Hours != 4.0 {
t.Errorf("review_latency_p50_hours = %v, want 4.0", *r.ReviewLatencyP50Hours)
}
}

func itoa(n int) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
out := []byte{}
for n > 0 {
out = append([]byte{'0' + byte(n%10)}, out...)
n /= 10
}
if neg {
out = append([]byte{'-'}, out...)
}
return string(out)
}
Loading