Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
1f9b11f
Parallelize versioning3 functional suite
stephanos May 25, 2026
a89673b
Enable worker service for versioning3 parallel suite
stephanos May 25, 2026
310a429
Wait for versioning3 registration in failing paths
stephanos May 25, 2026
e7e263a
Wait for pinned CaN ramping registration
stephanos May 25, 2026
ab5cbcd
Wait for retry ramping registration
stephanos May 25, 2026
b758cec
Extend versioning3 propagation wait
stephanos May 25, 2026
49ebb24
Wait for child ramping registration
stephanos May 25, 2026
adb51fb
Extend pinned version membership wait
stephanos May 25, 2026
9d73b7b
Use registration helper for pinned no-poller test
stephanos May 25, 2026
e0bffa5
Validate pinned membership after registration
stephanos May 25, 2026
0c9694f
Register child version before inheritance test
stephanos May 25, 2026
186ce87
Extend versioning3 backlog count await
stephanos May 25, 2026
ac2fa34
Strengthen versioning3 registration polling
stephanos May 25, 2026
be56b6f
Harden versioning3 deployment API waits
stephanos May 25, 2026
089495b
Harden versioning3 registration waits
stephanos May 25, 2026
b716d23
Re-register inherited child workflow versions
stephanos May 25, 2026
98e7cb4
Validate speculative task history from poll response
stephanos May 25, 2026
f0e1141
Preserve inherited child workflow version
stephanos May 26, 2026
e63b47a
Move child inheritance assertions outside workflows
stephanos May 26, 2026
dd3d5b2
Extend versioning3 test context timeout
stephanos May 26, 2026
1ca13e4
Extend versioning3 verification waits
stephanos May 26, 2026
fbd33ae
Normalize versioning3 verification waits
stephanos May 26, 2026
08eba1f
Revert versioning3 timeout expansion
stephanos May 26, 2026
476a48d
Run versioning3 matching behavior cases sequentially
stephanos May 26, 2026
312d130
Limit concurrent versioning3 test environments
stephanos May 26, 2026
1b903c6
Revert "Limit concurrent versioning3 test environments"
stephanos May 26, 2026
90d819f
Shorten versioning3 await RPC attempts
stephanos May 26, 2026
0a3ffa7
Reduce versioning3 verification poll pressure
stephanos May 26, 2026
81b7b96
Reduce versioning3 propagation partition checks
stephanos May 26, 2026
c29be2a
Use suite-scoped cluster pools
stephanos May 26, 2026
70e4b2c
Revert ineffective versioning3 wait tuning
stephanos May 26, 2026
d613f03
Let child inheritance workflows pause via signal
stephanos May 26, 2026
ccbee9e
Limit child inheritance signal pause to inherit cases
stephanos May 26, 2026
398c4df
Start sticky completion poll before activity finish
stephanos May 26, 2026
4754a90
Harden remaining versioning3 parallel waits
stephanos May 26, 2026
e8763bd
Use shorter RPC deadlines in versioning3 awaits
stephanos May 26, 2026
7ba53dd
Revert "Use shorter RPC deadlines in versioning3 awaits"
stephanos May 26, 2026
52755c3
Retry late activity poller handoff
stephanos May 26, 2026
5e39533
Close standalone versioning3 SDK clients
stephanos May 26, 2026
d771296
Reduce versioning3 retry summary noise
stephanos May 27, 2026
92fd531
Back off await polling with jitter
stephanos May 27, 2026
c6119d3
Tune await backoff for flaky suites
stephanos May 27, 2026
aae3d5f
Add versioning3 await diagnostics
stephanos May 27, 2026
582ffa2
Surface final await timeout attempts
stephanos May 27, 2026
032c195
Report await timeout timing stats
stephanos May 27, 2026
d4b7c8c
Fix testcore global override test setup
stephanos May 27, 2026
61d4107
Preserve await diagnostics in test summaries
stephanos May 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 190 additions & 6 deletions common/testing/await/report.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package await

import (
"cmp"
"fmt"
"slices"
"strings"
"testing"
"time"
Expand All @@ -20,19 +22,112 @@ type attemptFailure struct {
errors []string
}

type awaitStats struct {
attempts []attemptTiming
sleeps []time.Duration
failedAttempts int
stoppedAttempts int
deadlockAttempts int
}

type attemptTiming struct {
attempt int
duration time.Duration
}

func (s *awaitStats) recordAttempt(attempt int, duration time.Duration, failed, stopped, deadlocked bool) {
s.attempts = append(s.attempts, attemptTiming{
attempt: attempt,
duration: duration,
})
if failed {
s.failedAttempts++
}
if stopped {
s.stoppedAttempts++
}
if deadlocked {
s.deadlockAttempts++
}
}

func (s *awaitStats) recordSleep(duration time.Duration) {
s.sleeps = append(s.sleeps, duration)
}

// reportTimeout reports the timeout failure plus collected attempt errors.
func reportTimeout(tb testing.TB, failures []attemptFailure, funcName, timeoutMsg string, effectiveTimeout time.Duration, polls int) {
reportAttemptErrors(tb, failures)
func reportTimeout(
tb testing.TB,
failures []attemptFailure,
stats awaitStats,
parentErr error,
awaitErr error,
deadlineRemaining time.Duration,
funcName string,
timeoutMsg string,
effectiveTimeout time.Duration,
polls int,
) {
var sections []string
sections = append(sections, formatAwaitStats(stats, parentErr, awaitErr, deadlineRemaining, polls))
if s := formatFinalAttemptContext(failures); s != "" {
sections = append(sections, s)
}
if s := formatAttemptErrors(failures); s != "" {
sections = append(sections, s)
}
diagnostics := strings.Join(sections, "\n\n")
if timeoutMsg != "" {
tb.Fatalf("%s: %s (not satisfied after %v, %d polls)", funcName, timeoutMsg, effectiveTimeout, polls)
tb.Fatalf("%s\n\n%s: %s (not satisfied after %v, %d polls)", diagnostics, funcName, timeoutMsg, effectiveTimeout, polls)
} else {
tb.Fatalf("%s: condition not satisfied after %v (%d polls)", funcName, effectiveTimeout, polls)
tb.Fatalf("%s\n\n%s: condition not satisfied after %v (%d polls)", diagnostics, funcName, effectiveTimeout, polls)
}
}

func formatAwaitStats(stats awaitStats, parentErr error, awaitErr error, deadlineRemaining time.Duration, polls int) string {
var b strings.Builder
fmt.Fprintf(&b, "await stats: polls=%d failed_attempts=%d stopped_attempts=%d deadlock_attempts=%d",
polls, stats.failedAttempts, stats.stoppedAttempts, stats.deadlockAttempts)
writeDurationSummary(&b, "attempt_duration", attemptDurations(stats.attempts))
writeDurationSummary(&b, "sleep_duration", stats.sleeps)
writeSlowestAttempts(&b, stats.attempts)
fmt.Fprintf(&b, "\ncontext at timeout: parent_err=%v await_err=%v deadline_remaining=%v",
parentErr, awaitErr, deadlineRemaining)
return b.String()
}

func formatFinalAttemptContext(failures []attemptFailure) string {
if len(failures) == 0 {
return ""
}

var b strings.Builder
last := failures[len(failures)-1]
b.WriteString("last failed attempt before timeout:")
writeAttemptFailure(&b, last)

if previous, ok := previousNonDeadlineFailure(failures); ok && previous.attempt != last.attempt {
b.WriteString("\n\nlast non-deadline failed attempt:")
writeAttemptFailure(&b, previous)
} else if isDeadlineOnlyFailure(last) {
if previous, ok := previousDistinctFailure(failures, last); ok {
b.WriteString("\n\nprevious distinct failed attempt:")
writeAttemptFailure(&b, previous)
}
}

return b.String()
}

func reportAttemptErrors(tb testing.TB, failures []attemptFailure) {
if s := formatAttemptErrors(failures); s != "" {
tb.Errorf("%s", s)
}
}

func formatAttemptErrors(failures []attemptFailure) string {
if len(failures) == 0 {
return
return ""
}

var b strings.Builder
Expand All @@ -51,7 +146,96 @@ func reportAttemptErrors(tb testing.TB, failures []attemptFailure) {
writeAttemptFailure(&b, f)
}
}
tb.Errorf("%s", b.String())
return b.String()
}

func previousDistinctFailure(failures []attemptFailure, last attemptFailure) (attemptFailure, bool) {
lastText := attemptFailureText(last)
for i := len(failures) - 2; i >= 0; i-- {
if attemptFailureText(failures[i]) != lastText {
return failures[i], true
}
}
return attemptFailure{}, false
}

func previousNonDeadlineFailure(failures []attemptFailure) (attemptFailure, bool) {
for i := len(failures) - 1; i >= 0; i-- {
if !hasContextDeadlineFailure(failures[i]) {
return failures[i], true
}
}
return attemptFailure{}, false
}

func isDeadlineOnlyFailure(f attemptFailure) bool {
if len(f.errors) == 0 {
return false
}
text := strings.ToLower(attemptFailureText(f))
if !hasContextDeadlineText(text) {
return false
}
withoutDeadline := strings.ReplaceAll(text, "context deadline exceeded", "")
withoutDeadline = strings.ReplaceAll(withoutDeadline, "context canceled", "")
withoutDeadline = strings.TrimSpace(withoutDeadline)
return withoutDeadline == "" ||
strings.Contains(withoutDeadline, "error trace:") ||
strings.Contains(withoutDeadline, "error:")
}

func hasContextDeadlineFailure(f attemptFailure) bool {
return hasContextDeadlineText(strings.ToLower(attemptFailureText(f)))
}

func hasContextDeadlineText(text string) bool {
return strings.Contains(text, "context deadline exceeded") ||
strings.Contains(text, "context canceled")
}

func attemptFailureText(f attemptFailure) string {
return strings.Join(f.errors, "\n")
}

func writeDurationSummary(b *strings.Builder, label string, durations []time.Duration) {
if len(durations) == 0 {
fmt.Fprintf(b, " %s=(none)", label)
return
}
minDuration, maxDuration, totalDuration := durations[0], durations[0], time.Duration(0)
for _, duration := range durations {
minDuration = min(minDuration, duration)
maxDuration = max(maxDuration, duration)
totalDuration += duration
}
fmt.Fprintf(b, " %s min=%v avg=%v max=%v last=%v",
label, minDuration, totalDuration/time.Duration(len(durations)), maxDuration, durations[len(durations)-1])
}

func writeSlowestAttempts(b *strings.Builder, timings []attemptTiming) {
if len(timings) == 0 {
b.WriteString("\nslowest attempts: (none)")
return
}
slowest := slices.Clone(timings)
slices.SortFunc(slowest, func(a, b attemptTiming) int {
return cmp.Compare(b.duration, a.duration)
})
if len(slowest) > 3 {
slowest = slowest[:3]
}
b.WriteString("\nslowest attempts:")
for _, timing := range slowest {
fmt.Fprintf(b, " #%d=%v", timing.attempt, timing.duration)
}
}

func attemptDurations(timings []attemptTiming) []time.Duration {
durations := make([]time.Duration, 0, len(timings))
for _, timing := range timings {
durations = append(durations, timing.duration)
}
return durations
}

func writeAttemptFailure(b *strings.Builder, f attemptFailure) {
Expand Down
66 changes: 62 additions & 4 deletions common/testing/await/require_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@ import (
"context"
"fmt"
"os"
"sync/atomic"
"testing"
"time"
)

const requireMisuseHint = "use the *await.T passed to the callback, not s.T() or suite assertion methods"

const (
maxPollInterval = time.Second
pollBackoffMultiplierNum = 3
pollBackoffMultiplierDen = 2
pollJitterDivisor = 10
)

var pollJitterCounter atomic.Uint64

// softDeadlockTimeoutEnvVar overrides the default soft-deadlock timeout.
// Parsed as a Go duration, e.g. "10s".
const softDeadlockTimeoutEnvVar = "TEMPORAL_AWAIT_SOFT_DEADLOCK_TIMEOUT"
Expand Down Expand Up @@ -110,6 +120,8 @@ func run(

var failures []attemptFailure
polls := 0
pollBackoff := newPollBackoff(pollInterval)
stats := awaitStats{}

for {
// Parent context was canceled while we were sleeping (not our deadline).
Expand All @@ -127,8 +139,11 @@ func run(
t := &T{tb: tb, ctx: attemptCtx}

// Run attempt.
attemptStart := time.Now()
res := runAttempt(t, condition, attemptCancel, funcName, cancellable)
attemptDuration := time.Since(attemptStart)
attemptCancel()
stats.recordAttempt(polls, attemptDuration, len(t.errors) > 0, res.stopped, res.deadlocked)
if res.panicVal != nil {
panic(res.panicVal) // propagate to caller
}
Expand Down Expand Up @@ -162,7 +177,7 @@ func run(

// Our deadline expired.
if deadlineReached(deadline) {
reportTimeout(tb, failures, funcName, timeoutMsg, effectiveTimeout, polls)
reportTimeout(tb, failures, stats, parentCtx.Err(), awaitCtx.Err(), time.Until(deadline), funcName, timeoutMsg, effectiveTimeout, polls)
return
}

Expand All @@ -171,8 +186,8 @@ func run(
return
}

// Wait for pollInterval, or context is canceled or deadline is reached.
sleep(awaitCtx, deadline, pollInterval)
// Wait using backoff, or until context is canceled or deadline is reached.
stats.recordSleep(sleep(awaitCtx, deadline, pollBackoff.next()))
}
}

Expand Down Expand Up @@ -262,19 +277,62 @@ func runAttempt(
}
}

func sleep(ctx context.Context, deadline time.Time, pollInterval time.Duration) {
type pollBackoff struct {
current time.Duration
max time.Duration
}

func newPollBackoff(initial time.Duration) pollBackoff {
maxInterval := maxPollInterval
if initial > maxInterval {
maxInterval = initial
}
return pollBackoff{
current: initial,
max: maxInterval,
}
}

func (b *pollBackoff) next() time.Duration {
delay := addJitter(b.current, b.max)
if b.current < b.max {
b.current = min(b.current*pollBackoffMultiplierNum/pollBackoffMultiplierDen, b.max)
}
return delay
}

func addJitter(base, maxDelay time.Duration) time.Duration {
if base <= 0 {
return base
}
jitterRange := base / pollJitterDivisor
if jitterRange <= 0 {
return base
}
seed := uint64(time.Now().UnixNano()) ^ pollJitterCounter.Add(0x9e3779b97f4a7c15)
jitter := time.Duration(seed%uint64(2*jitterRange+1)) - jitterRange
delay := base + jitter
if delay <= 0 {
return base
}
return min(delay, maxDelay)
}

func sleep(ctx context.Context, deadline time.Time, pollInterval time.Duration) time.Duration {
remaining := time.Until(deadline)
if remaining < pollInterval {
pollInterval = remaining
}

start := time.Now()
timer := time.NewTimer(pollInterval)
defer timer.Stop()

select {
case <-ctx.Done():
case <-timer.C:
}
return time.Since(start)
}

func deadlineReached(deadline time.Time) bool {
Expand Down
Loading
Loading