diff --git a/common/testing/await/report.go b/common/testing/await/report.go index 49a56a0937..45ee5953aa 100644 --- a/common/testing/await/report.go +++ b/common/testing/await/report.go @@ -1,7 +1,9 @@ package await import ( + "cmp" "fmt" + "slices" "strings" "testing" "time" @@ -20,19 +22,112 @@ type attemptFailure struct { errors []string } +type awaitStats struct { + attempts []attemptTiming + sleeps []time.Duration + failedAttempts int + stoppedAttempts int + deadlockAttempts int +} + +type attemptTiming struct { + attempt int + duration time.Duration +} + +func (s *awaitStats) recordAttempt(attempt int, duration time.Duration, failed, stopped, deadlocked bool) { + s.attempts = append(s.attempts, attemptTiming{ + attempt: attempt, + duration: duration, + }) + if failed { + s.failedAttempts++ + } + if stopped { + s.stoppedAttempts++ + } + if deadlocked { + s.deadlockAttempts++ + } +} + +func (s *awaitStats) recordSleep(duration time.Duration) { + s.sleeps = append(s.sleeps, duration) +} + // reportTimeout reports the timeout failure plus collected attempt errors. -func reportTimeout(tb testing.TB, failures []attemptFailure, funcName, timeoutMsg string, effectiveTimeout time.Duration, polls int) { - reportAttemptErrors(tb, failures) +func reportTimeout( + tb testing.TB, + failures []attemptFailure, + stats awaitStats, + parentErr error, + awaitErr error, + deadlineRemaining time.Duration, + funcName string, + timeoutMsg string, + effectiveTimeout time.Duration, + polls int, +) { + var sections []string + sections = append(sections, formatAwaitStats(stats, parentErr, awaitErr, deadlineRemaining, polls)) + if s := formatFinalAttemptContext(failures); s != "" { + sections = append(sections, s) + } + if s := formatAttemptErrors(failures); s != "" { + sections = append(sections, s) + } + diagnostics := strings.Join(sections, "\n\n") if timeoutMsg != "" { - tb.Fatalf("%s: %s (not satisfied after %v, %d polls)", funcName, timeoutMsg, effectiveTimeout, polls) + tb.Fatalf("%s\n\n%s: %s (not satisfied after %v, %d polls)", diagnostics, funcName, timeoutMsg, effectiveTimeout, polls) } else { - tb.Fatalf("%s: condition not satisfied after %v (%d polls)", funcName, effectiveTimeout, polls) + tb.Fatalf("%s\n\n%s: condition not satisfied after %v (%d polls)", diagnostics, funcName, effectiveTimeout, polls) + } +} + +func formatAwaitStats(stats awaitStats, parentErr error, awaitErr error, deadlineRemaining time.Duration, polls int) string { + var b strings.Builder + fmt.Fprintf(&b, "await stats: polls=%d failed_attempts=%d stopped_attempts=%d deadlock_attempts=%d", + polls, stats.failedAttempts, stats.stoppedAttempts, stats.deadlockAttempts) + writeDurationSummary(&b, "attempt_duration", attemptDurations(stats.attempts)) + writeDurationSummary(&b, "sleep_duration", stats.sleeps) + writeSlowestAttempts(&b, stats.attempts) + fmt.Fprintf(&b, "\ncontext at timeout: parent_err=%v await_err=%v deadline_remaining=%v", + parentErr, awaitErr, deadlineRemaining) + return b.String() +} + +func formatFinalAttemptContext(failures []attemptFailure) string { + if len(failures) == 0 { + return "" + } + + var b strings.Builder + last := failures[len(failures)-1] + b.WriteString("last failed attempt before timeout:") + writeAttemptFailure(&b, last) + + if previous, ok := previousNonDeadlineFailure(failures); ok && previous.attempt != last.attempt { + b.WriteString("\n\nlast non-deadline failed attempt:") + writeAttemptFailure(&b, previous) + } else if isDeadlineOnlyFailure(last) { + if previous, ok := previousDistinctFailure(failures, last); ok { + b.WriteString("\n\nprevious distinct failed attempt:") + writeAttemptFailure(&b, previous) + } } + + return b.String() } func reportAttemptErrors(tb testing.TB, failures []attemptFailure) { + if s := formatAttemptErrors(failures); s != "" { + tb.Errorf("%s", s) + } +} + +func formatAttemptErrors(failures []attemptFailure) string { if len(failures) == 0 { - return + return "" } var b strings.Builder @@ -51,7 +146,96 @@ func reportAttemptErrors(tb testing.TB, failures []attemptFailure) { writeAttemptFailure(&b, f) } } - tb.Errorf("%s", b.String()) + return b.String() +} + +func previousDistinctFailure(failures []attemptFailure, last attemptFailure) (attemptFailure, bool) { + lastText := attemptFailureText(last) + for i := len(failures) - 2; i >= 0; i-- { + if attemptFailureText(failures[i]) != lastText { + return failures[i], true + } + } + return attemptFailure{}, false +} + +func previousNonDeadlineFailure(failures []attemptFailure) (attemptFailure, bool) { + for i := len(failures) - 1; i >= 0; i-- { + if !hasContextDeadlineFailure(failures[i]) { + return failures[i], true + } + } + return attemptFailure{}, false +} + +func isDeadlineOnlyFailure(f attemptFailure) bool { + if len(f.errors) == 0 { + return false + } + text := strings.ToLower(attemptFailureText(f)) + if !hasContextDeadlineText(text) { + return false + } + withoutDeadline := strings.ReplaceAll(text, "context deadline exceeded", "") + withoutDeadline = strings.ReplaceAll(withoutDeadline, "context canceled", "") + withoutDeadline = strings.TrimSpace(withoutDeadline) + return withoutDeadline == "" || + strings.Contains(withoutDeadline, "error trace:") || + strings.Contains(withoutDeadline, "error:") +} + +func hasContextDeadlineFailure(f attemptFailure) bool { + return hasContextDeadlineText(strings.ToLower(attemptFailureText(f))) +} + +func hasContextDeadlineText(text string) bool { + return strings.Contains(text, "context deadline exceeded") || + strings.Contains(text, "context canceled") +} + +func attemptFailureText(f attemptFailure) string { + return strings.Join(f.errors, "\n") +} + +func writeDurationSummary(b *strings.Builder, label string, durations []time.Duration) { + if len(durations) == 0 { + fmt.Fprintf(b, " %s=(none)", label) + return + } + minDuration, maxDuration, totalDuration := durations[0], durations[0], time.Duration(0) + for _, duration := range durations { + minDuration = min(minDuration, duration) + maxDuration = max(maxDuration, duration) + totalDuration += duration + } + fmt.Fprintf(b, " %s min=%v avg=%v max=%v last=%v", + label, minDuration, totalDuration/time.Duration(len(durations)), maxDuration, durations[len(durations)-1]) +} + +func writeSlowestAttempts(b *strings.Builder, timings []attemptTiming) { + if len(timings) == 0 { + b.WriteString("\nslowest attempts: (none)") + return + } + slowest := slices.Clone(timings) + slices.SortFunc(slowest, func(a, b attemptTiming) int { + return cmp.Compare(b.duration, a.duration) + }) + if len(slowest) > 3 { + slowest = slowest[:3] + } + b.WriteString("\nslowest attempts:") + for _, timing := range slowest { + fmt.Fprintf(b, " #%d=%v", timing.attempt, timing.duration) + } +} + +func attemptDurations(timings []attemptTiming) []time.Duration { + durations := make([]time.Duration, 0, len(timings)) + for _, timing := range timings { + durations = append(durations, timing.duration) + } + return durations } func writeAttemptFailure(b *strings.Builder, f attemptFailure) { diff --git a/common/testing/await/require_ctx.go b/common/testing/await/require_ctx.go index a830d551d2..4c29b15fef 100644 --- a/common/testing/await/require_ctx.go +++ b/common/testing/await/require_ctx.go @@ -4,12 +4,22 @@ import ( "context" "fmt" "os" + "sync/atomic" "testing" "time" ) const requireMisuseHint = "use the *await.T passed to the callback, not s.T() or suite assertion methods" +const ( + maxPollInterval = time.Second + pollBackoffMultiplierNum = 3 + pollBackoffMultiplierDen = 2 + pollJitterDivisor = 10 +) + +var pollJitterCounter atomic.Uint64 + // softDeadlockTimeoutEnvVar overrides the default soft-deadlock timeout. // Parsed as a Go duration, e.g. "10s". const softDeadlockTimeoutEnvVar = "TEMPORAL_AWAIT_SOFT_DEADLOCK_TIMEOUT" @@ -110,6 +120,8 @@ func run( var failures []attemptFailure polls := 0 + pollBackoff := newPollBackoff(pollInterval) + stats := awaitStats{} for { // Parent context was canceled while we were sleeping (not our deadline). @@ -127,8 +139,11 @@ func run( t := &T{tb: tb, ctx: attemptCtx} // Run attempt. + attemptStart := time.Now() res := runAttempt(t, condition, attemptCancel, funcName, cancellable) + attemptDuration := time.Since(attemptStart) attemptCancel() + stats.recordAttempt(polls, attemptDuration, len(t.errors) > 0, res.stopped, res.deadlocked) if res.panicVal != nil { panic(res.panicVal) // propagate to caller } @@ -162,7 +177,7 @@ func run( // Our deadline expired. if deadlineReached(deadline) { - reportTimeout(tb, failures, funcName, timeoutMsg, effectiveTimeout, polls) + reportTimeout(tb, failures, stats, parentCtx.Err(), awaitCtx.Err(), time.Until(deadline), funcName, timeoutMsg, effectiveTimeout, polls) return } @@ -171,8 +186,8 @@ func run( return } - // Wait for pollInterval, or context is canceled or deadline is reached. - sleep(awaitCtx, deadline, pollInterval) + // Wait using backoff, or until context is canceled or deadline is reached. + stats.recordSleep(sleep(awaitCtx, deadline, pollBackoff.next())) } } @@ -262,12 +277,54 @@ func runAttempt( } } -func sleep(ctx context.Context, deadline time.Time, pollInterval time.Duration) { +type pollBackoff struct { + current time.Duration + max time.Duration +} + +func newPollBackoff(initial time.Duration) pollBackoff { + maxInterval := maxPollInterval + if initial > maxInterval { + maxInterval = initial + } + return pollBackoff{ + current: initial, + max: maxInterval, + } +} + +func (b *pollBackoff) next() time.Duration { + delay := addJitter(b.current, b.max) + if b.current < b.max { + b.current = min(b.current*pollBackoffMultiplierNum/pollBackoffMultiplierDen, b.max) + } + return delay +} + +func addJitter(base, maxDelay time.Duration) time.Duration { + if base <= 0 { + return base + } + jitterRange := base / pollJitterDivisor + if jitterRange <= 0 { + return base + } + seed := uint64(time.Now().UnixNano()) ^ pollJitterCounter.Add(0x9e3779b97f4a7c15) + jitter := time.Duration(seed%uint64(2*jitterRange+1)) - jitterRange + delay := base + jitter + if delay <= 0 { + return base + } + return min(delay, maxDelay) +} + +func sleep(ctx context.Context, deadline time.Time, pollInterval time.Duration) time.Duration { remaining := time.Until(deadline) if remaining < pollInterval { pollInterval = remaining } + start := time.Now() timer := time.NewTimer(pollInterval) defer timer.Stop() @@ -275,6 +332,7 @@ func sleep(ctx context.Context, deadline time.Time, pollInterval time.Duration) case <-ctx.Done(): case <-timer.C: } + return time.Since(start) } func deadlineReached(deadline time.Time) bool { diff --git a/common/testing/await/require_ctx_test.go b/common/testing/await/require_ctx_test.go index 5810f46eb5..c44ff5dd12 100644 --- a/common/testing/await/require_ctx_test.go +++ b/common/testing/await/require_ctx_test.go @@ -146,11 +146,44 @@ func TestRequire_PollIntervalStartsAfterAttemptFinishes(t *testing.T) { require.Len(t, attemptEnds, 3) for i := 1; i < len(attemptStarts); i++ { gap := attemptStarts[i].Sub(attemptEnds[i-1]) - require.GreaterOrEqual(t, gap, pollInterval, + require.GreaterOrEqual(t, gap, pollInterval-pollInterval/10, "poll interval should run after attempt finishes (gap=%v < %v)", gap, pollInterval) } } +func TestRequire_PollIntervalBacksOff(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + var attemptStarts []time.Time + var attemptEnds []time.Time + attemptDuration := 10 * time.Millisecond + pollInterval := 20 * time.Millisecond + + await.Require(t.Context(), t, func(t *await.T) { + attemptStarts = append(attemptStarts, time.Now()) + defer func() { attemptEnds = append(attemptEnds, time.Now()) }() + + time.Sleep(attemptDuration) //nolint:forbidigo // simulate attempt work to measure gap between attempts + + if attempts.Add(1) < 4 { + t.Error("not ready") + } + }, time.Second, pollInterval) + + require.Equal(t, int32(4), attempts.Load()) + require.Len(t, attemptStarts, 4) + require.Len(t, attemptEnds, 4) + + var gaps []time.Duration + for i := 1; i < len(attemptStarts); i++ { + gaps = append(gaps, attemptStarts[i].Sub(attemptEnds[i-1])) + } + require.GreaterOrEqual(t, gaps[0], pollInterval-pollInterval/10) + require.GreaterOrEqual(t, gaps[1], 3*pollInterval/2-(3*pollInterval/2)/10) + require.GreaterOrEqual(t, gaps[2], 9*pollInterval/4-(9*pollInterval/4)/10) +} + func TestRequire_FailureScenarios(t *testing.T) { t.Parallel() @@ -267,8 +300,39 @@ func TestRequire_FailureScenarios(t *testing.T) { }, time.Second, 100*time.Millisecond) }) require.True(t, tb.Failed()) - require.Contains(t, tb.fatals(), "not satisfied after") - require.Equal(t, "attempt errors:\n\n --- attempt 1 ---\n first attempt error\n\n --- attempt 2 ---\n last attempt error", tb.errors()) + fatals := tb.fatals() + require.Contains(t, fatals, "not satisfied after") + require.Contains(t, fatals, "await stats: polls=2 failed_attempts=2") + require.Contains(t, fatals, "attempt_duration min=") + require.Contains(t, fatals, "sleep_duration min=") + require.Contains(t, fatals, "slowest attempts:") + require.Contains(t, fatals, "context at timeout:") + require.Contains(t, fatals, "last failed attempt before timeout:\n\n --- attempt 2 ---\n last attempt error") + require.Contains(t, fatals, "attempt errors:\n\n --- attempt 1 ---\n first attempt error\n\n --- attempt 2 ---\n last attempt error") + require.Equal(t, int32(2), attempts.Load()) + }) + + t.Run("reports previous distinct attempt when last attempt is only deadline", func(t *testing.T) { + t.Parallel() + + ctx := testcontext.New(t) + var attempts atomic.Int32 + tb := newRecordingTB() + tb.run(func() { + await.Require(ctx, tb, func(t *await.T) { + if attempts.Add(1) == 1 { + t.Error("observed state: version is still draining") + return + } + <-t.Context().Done() + t.Error("context deadline exceeded") + }, time.Second, 100*time.Millisecond) + }) + require.True(t, tb.Failed()) + fatals := tb.fatals() + require.Contains(t, fatals, "not satisfied after") + require.Contains(t, fatals, "last failed attempt before timeout:\n\n --- attempt 2 ---\n context deadline exceeded") + require.Contains(t, fatals, "last non-deadline failed attempt:\n\n --- attempt 1 ---\n observed state: version is still draining") require.Equal(t, int32(2), attempts.Load()) }) @@ -290,12 +354,13 @@ func TestRequire_FailureScenarios(t *testing.T) { n := attempts.Load() require.Greater(t, n, int32(4), "need >4 attempts to exercise truncation") - errs := tb.errors() - require.Contains(t, errs, "attempt errors:\n\n --- attempt 1 ---\n attempt 1 failed\n") - require.Contains(t, errs, fmt.Sprintf("... %d attempts omitted ...", n-4)) + fatals := tb.fatals() + require.Contains(t, fatals, fmt.Sprintf("last failed attempt before timeout:\n\n --- attempt %d ---\n attempt %d failed", n, n)) + require.Contains(t, fatals, "attempt errors:\n\n --- attempt 1 ---\n attempt 1 failed\n") + require.Contains(t, fatals, fmt.Sprintf("... %d attempts omitted ...", n-4)) // Last three attempts present in order. for i := n - 2; i <= n; i++ { - require.Contains(t, errs, fmt.Sprintf("--- attempt %d ---\n attempt %d failed", i, i)) + require.Contains(t, fatals, fmt.Sprintf("--- attempt %d ---\n attempt %d failed", i, i)) } }) diff --git a/common/testing/parallelsuite/suite.go b/common/testing/parallelsuite/suite.go index ca32645449..5ebadc360e 100644 --- a/common/testing/parallelsuite/suite.go +++ b/common/testing/parallelsuite/suite.go @@ -104,6 +104,16 @@ func (s *Suite[T]) Run(name string, fn func(T)) bool { }) } +// RunSequential creates a sequential subtest. The callback receives a fresh copy +// of the concrete suite type, initialized for the subtest's *testing.T. +func (s *Suite[T]) RunSequential(name string, fn func(T)) bool { + pt := s.guardT.T // grab T before sealing + s.guardT.markHasSubtests() + return pt.Run(name, func(t *testing.T) { + fn(s.copySuite(t, false, nil, nil).(T)) + }) +} + // Await calls fn repeatedly until all assertions pass or timeout is reached. func (s *Suite[T]) Await(fn func(T), timeout, interval time.Duration) { s.Awaitf(fn, timeout, interval, "") diff --git a/common/testing/parallelsuite/suite_test.go b/common/testing/parallelsuite/suite_test.go index 60b5cd6914..98f438f684 100644 --- a/common/testing/parallelsuite/suite_test.go +++ b/common/testing/parallelsuite/suite_test.go @@ -100,6 +100,15 @@ func (s *contextSuite) TestAwaitUsesSuiteContext() { }, 100*time.Millisecond, time.Millisecond) } +type sequentialSubtestSuite struct{ Suite[*sequentialSubtestSuite] } + +func (s *sequentialSubtestSuite) TestRunSequential() { + s.True(s.runParallel) + s.RunSequential("subtest", func(s *sequentialSubtestSuite) { + s.False(s.runParallel) + }) +} + type sealAfterRunSuite struct{ Suite[*sealAfterRunSuite] } func (s *sealAfterRunSuite) TestAssertionAfterRun() { @@ -133,6 +142,9 @@ func TestRun_AcceptsSuite(t *testing.T) { t.Run("context", func(t *testing.T) { require.NotPanics(t, func() { Run(t, &contextSuite{}) }) }) + t.Run("sequential subtest", func(t *testing.T) { + require.NotPanics(t, func() { Run(t, &sequentialSubtestSuite{}) }) + }) } func TestRun_RejectsSuite(t *testing.T) { diff --git a/tests/testcore/functional_test_base.go b/tests/testcore/functional_test_base.go index 01feb7e670..278bb02772 100644 --- a/tests/testcore/functional_test_base.go +++ b/tests/testcore/functional_test_base.go @@ -276,7 +276,8 @@ func (s *FunctionalTestBase) TearDownSuite() { func (s *FunctionalTestBase) SetupSuiteWithCluster(options ...TestClusterOption) { // Acquire a slot from the dedicated test cluster pool. - testClusterPool.dedicated.acquireSlot(s.T()) + params := ApplyTestClusterOptions(options) + testClusterPool.acquireDedicatedSlot(s.T(), params.EnableWorkerService) s.setupCluster(options...) } diff --git a/tests/testcore/test_cluster_pool.go b/tests/testcore/test_cluster_pool.go index 7234ed79a0..afcacfffd0 100644 --- a/tests/testcore/test_cluster_pool.go +++ b/tests/testcore/test_cluster_pool.go @@ -40,15 +40,8 @@ func init() { maxUsage = 50 } - sharedPool := newPool(sharedSize, false) - sharedPool.maxUsage = maxUsage - - dedicatedPool := newPool(dedicatedSize, true) - dedicatedPool.maxUsage = maxUsage - testClusterPool = &clusterPool{ - shared: sharedPool, - dedicated: dedicatedPool, + pools: newClusterPools(sharedSize, dedicatedSize, maxUsage), } } @@ -82,6 +75,33 @@ func newPool(size int, exclusive bool) *pool { return p } +func newPoolWithMaxUsage(size int, exclusive bool, maxUsage int) *pool { + p := newPool(size, exclusive) + p.maxUsage = maxUsage + return p +} + +func newClusterPools(sharedSize, dedicatedSize, maxUsage int) map[clusterPoolKey]*pool { + pools := make(map[clusterPoolKey]*pool, 4) + for _, key := range []clusterPoolKey{ + {kind: poolKindShared}, + {kind: poolKindShared, workerService: true}, + {kind: poolKindDedicated}, + {kind: poolKindDedicated, workerService: true}, + } { + size := sharedSize + if key.kind == poolKindDedicated { + size = dedicatedSize + } + pools[key] = newPoolWithMaxUsage(size, key.kind == poolKindDedicated, maxUsage) + } + return pools +} + +func DefaultSuiteClusterPoolSize() int { + return max(1, runtime.GOMAXPROCS(0)/2) +} + // get returns a cluster from the pool, creating it lazily if needed. // For exclusive pools, blocks until a slot is available and registers cleanup. // For shared pools, uses round-robin. @@ -133,79 +153,87 @@ func (p *pool) acquireSlot(t *testing.T) { } type clusterPool struct { - shared *pool - dedicated *pool + pools map[clusterPoolKey]*pool suiteScoped sync.Map } type suiteScopedCluster struct { - once sync.Once - cluster *FunctionalTestBase + pools map[clusterPoolKey]*pool } -// UseSuiteScopedCluster makes NewEnv use one cluster for all tests under `t`. -// The cluster is created on first use and torn down when `t` completes. -// -// Deprecated: this only exists for backwards-compatibility with legacy sequential -// suite execution. -func UseSuiteScopedCluster(t *testing.T) { +type poolKind int + +const ( + poolKindShared poolKind = iota + poolKindDedicated +) + +type clusterPoolKey struct { + kind poolKind + workerService bool +} + +// UseSuiteScopedClusters makes NewEnv use suite-local cluster pools for all +// tests under `t`. Clusters are created on first use and torn down when `t` +// completes. +func UseSuiteScopedClusters(t *testing.T, size int) { t.Helper() + if size <= 0 { + t.Fatalf("suite-scoped cluster pool size must be positive, got %d", size) + } rootName, _, _ := strings.Cut(t.Name(), "/") if t.Name() != rootName { - t.Fatalf("UseSuiteScopedCluster must be called from a top-level test, got %q", t.Name()) + t.Fatalf("UseSuiteScopedClusters must be called from a top-level test, got %q", t.Name()) + } + suiteCluster := &suiteScopedCluster{ + pools: map[clusterPoolKey]*pool{ + {kind: poolKindShared}: newPool(size, false), + {kind: poolKindShared, workerService: true}: newPool(size, false), + }, + } + actual, loaded := testClusterPool.suiteScoped.LoadOrStore(rootName, suiteCluster) + if loaded { + suiteCluster = actual.(*suiteScopedCluster) } - testClusterPool.suiteScoped.LoadOrStore(rootName, &suiteScopedCluster{}) t.Cleanup(func() { - suiteClusterAny, ok := testClusterPool.suiteScoped.Load(rootName) - if ok { - suiteCluster := suiteClusterAny.(*suiteScopedCluster) - if suiteCluster.cluster != nil { - if err := suiteCluster.cluster.testCluster.TearDownCluster(); err != nil { - t.Logf("Failed to tear down suite-scoped cluster: %v", err) - } - } - } + suiteCluster.tearDown(t) testClusterPool.suiteScoped.Delete(rootName) }) } -func (p *clusterPool) get(t *testing.T, dedicated bool, dynamicConfig map[dynamicconfig.Key]any, clusterOpts []TestClusterOption) *FunctionalTestBase { +func (p *clusterPool) get(t *testing.T, dedicated bool, workerService bool, dynamicConfig map[dynamicconfig.Key]any, clusterOpts []TestClusterOption) *FunctionalTestBase { if dedicated || len(dynamicConfig) > 0 || len(clusterOpts) > 0 { - return p.getDedicated(t, dynamicConfig, clusterOpts) + return p.getDedicated(t, workerService, dynamicConfig, clusterOpts) } - if cluster := p.getSuiteScoped(t); cluster != nil { + if cluster := p.getSuiteScoped(t, workerService); cluster != nil { return cluster } - return p.getShared(t) + return p.getPooled(t, clusterPoolKey{ + kind: poolKindShared, + workerService: workerService, + }, nil, true, nil) } -func (p *clusterPool) getShared(t *testing.T) *FunctionalTestBase { - return p.shared.get(t, func() *FunctionalTestBase { - return p.createCluster(t, nil, true, nil) - }) -} - -func (p *clusterPool) getSuiteScoped(t *testing.T) *FunctionalTestBase { +func (p *clusterPool) getSuiteScoped(t *testing.T, workerService bool) *FunctionalTestBase { rootName, _, _ := strings.Cut(t.Name(), "/") - if _, ok := p.suiteScoped.Load(rootName); !ok { + suiteClusterAny, ok := p.suiteScoped.Load(rootName) + if !ok { return nil } - - suiteClusterAny, _ := p.suiteScoped.LoadOrStore(rootName, &suiteScopedCluster{}) suiteCluster := suiteClusterAny.(*suiteScopedCluster) - suiteCluster.once.Do(func() { - suiteCluster.cluster = p.createCluster(t, nil, true, nil) - }) - suiteCluster.cluster.SetT(t) - return suiteCluster.cluster + return suiteCluster.get(t, p, workerService) } -func (p *clusterPool) getDedicated(t *testing.T, dynamicConfig map[dynamicconfig.Key]any, clusterOpts []TestClusterOption) *FunctionalTestBase { +func (p *clusterPool) getDedicated(t *testing.T, workerService bool, dynamicConfig map[dynamicconfig.Key]any, clusterOpts []TestClusterOption) *FunctionalTestBase { + key := clusterPoolKey{ + kind: poolKindDedicated, + workerService: workerService, + } if len(dynamicConfig) > 0 || len(clusterOpts) > 0 { // Custom config or fx options require a fresh cluster (can't reuse). - p.dedicated.acquireSlot(t) - cluster := p.createCluster(t, dynamicConfig, false, clusterOpts) + p.pools[key].acquireSlot(t) + cluster := p.createCluster(t, dynamicConfig, false, workerService, clusterOpts) // Register cleanup to tear down the cluster when the test completes. t.Cleanup(func() { @@ -218,17 +246,55 @@ func (p *clusterPool) getDedicated(t *testing.T, dynamicConfig map[dynamicconfig } // If no custom config is provided, reuse an existing cluster. - return p.dedicated.get(t, func() *FunctionalTestBase { - return p.createCluster(t, nil, false, nil) + return p.getPooled(t, key, nil, false, nil) +} + +func (p *clusterPool) acquireDedicatedSlot(t *testing.T, workerService bool) { + p.pools[clusterPoolKey{ + kind: poolKindDedicated, + workerService: workerService, + }].acquireSlot(t) +} + +func (p *clusterPool) getPooled(t *testing.T, key clusterPoolKey, dynamicConfig map[dynamicconfig.Key]any, shared bool, clusterOpts []TestClusterOption) *FunctionalTestBase { + return p.pools[key].get(t, func() *FunctionalTestBase { + return p.createCluster(t, dynamicConfig, shared, key.workerService, clusterOpts) + }) +} + +func (s *suiteScopedCluster) get(t *testing.T, clusterPool *clusterPool, workerService bool) *FunctionalTestBase { + key := clusterPoolKey{ + kind: poolKindShared, + workerService: workerService, + } + return s.pools[key].get(t, func() *FunctionalTestBase { + return clusterPool.createCluster(t, nil, true, workerService, nil) }) } -func (p *clusterPool) createCluster(t *testing.T, dynamicConfig map[dynamicconfig.Key]any, shared bool, clusterOpts []TestClusterOption) *FunctionalTestBase { +func (s *suiteScopedCluster) tearDown(t *testing.T) { + for _, pool := range s.pools { + pool.tearDown(t) + } +} + +func (p *pool) tearDown(t *testing.T) { + for idx, cluster := range p.clusters { + if cluster == nil { + continue + } + if err := cluster.testCluster.TearDownCluster(); err != nil { + t.Logf("Failed to tear down suite-scoped cluster %d: %v", idx, err) + } + } +} + +func (p *clusterPool) createCluster(t *testing.T, dynamicConfig map[dynamicconfig.Key]any, shared bool, workerService bool, clusterOpts []TestClusterOption) *FunctionalTestBase { tbase := &FunctionalTestBase{} tbase.SetT(t) // Keep the worker service off unless explicitly enabled via WithWorkerService. - opts := []TestClusterOption{withWorkerService(false)} + opts := []TestClusterOption{withWorkerService(workerService)} if shared { opts = append(opts, WithSharedCluster()) } diff --git a/tests/testcore/test_cluster_pool_test.go b/tests/testcore/test_cluster_pool_test.go index b7dd66ea09..5718e4065b 100644 --- a/tests/testcore/test_cluster_pool_test.go +++ b/tests/testcore/test_cluster_pool_test.go @@ -6,6 +6,8 @@ import ( "github.com/stretchr/testify/require" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/membership/static" + "go.temporal.io/server/common/primitives" ) func TestGlobalOverridesSurviveTestCleanup(t *testing.T) { @@ -14,6 +16,13 @@ func TestGlobalOverridesSurviveTestCleanup(t *testing.T) { t.Run("create", func(t *testing.T) { impl := newTemporal(t, &TemporalParams{ ClusterMetadataConfig: &cluster.Config{}, + HostsByProtocolByService: map[transferProtocol]map[primitives.ServiceName]static.Hosts{ + httpProtocol: { + primitives.FrontendService: { + All: []string{"127.0.0.1:0"}, + }, + }, + }, }) dcClient = impl.dcClient }) diff --git a/tests/testcore/test_env.go b/tests/testcore/test_env.go index 0da369e129..8eec924b6d 100644 --- a/tests/testcore/test_env.go +++ b/tests/testcore/test_env.go @@ -91,6 +91,7 @@ type TestOption func(*testOptions) type testOptions struct { dedicatedCluster bool dedicatedReason string + workerService bool dynamicConfigSettings []dynamicConfigOverride clusterOptions []TestClusterOption } @@ -126,12 +127,10 @@ func WithFxOptions(serviceName primitives.ServiceName, opts ...fx.Option) TestOp } // WithWorkerService enables the system worker service. The service is off by -// default to avoid the worker overhead. This implies a dedicated cluster. -func WithWorkerService(reason string) TestOption { +// default to avoid the worker overhead. +func WithWorkerService(_ string) TestOption { return func(o *testOptions) { - o.dedicatedCluster = true - o.clusterOptions = append(o.clusterOptions, withWorkerService(true)) - o.dedicatedReason = "worker service required: " + reason + o.workerService = true } } @@ -188,7 +187,7 @@ func NewEnv(t *testing.T, opts ...TestOption) *TestEnv { } // Obtain the test cluster from the pool. - base := testClusterPool.get(t, options.dedicatedCluster, startupConfig, options.clusterOptions) + base := testClusterPool.get(t, options.dedicatedCluster, options.workerService, startupConfig, options.clusterOptions) cluster := base.GetTestCluster() // Create a dedicated namespace for the test to help with test isolation. diff --git a/tests/testcore/test_env_test.go b/tests/testcore/test_env_test.go index f8ad7234dd..1429665be5 100644 --- a/tests/testcore/test_env_test.go +++ b/tests/testcore/test_env_test.go @@ -28,6 +28,14 @@ func (s *TestEnvSuite) TestDedicatedClusterGuard_FailsWhenUnused() { `testcore.WithDedicatedCluster() was requested but no dedicated-cluster-only feature was used`) } +func (s *TestEnvSuite) TestWithWorkerServiceDoesNotRequireDedicatedCluster() { + var opts testOptions + WithWorkerService("test")(&opts) + + s.False(opts.dedicatedCluster) + s.True(opts.workerService) +} + func (s *TestEnvSuite) TestDedicatedClusterGuard_NoErrorAfterUse() { guard := newDedicatedClusterGuard(true) guard.record("global hook") diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 37d4d7ab2a..a9edb5f143 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "sync/atomic" "testing" "time" @@ -54,13 +55,16 @@ import ( type versionStatus int const ( - tqTypeWf = enumspb.TASK_QUEUE_TYPE_WORKFLOW - tqTypeAct = enumspb.TASK_QUEUE_TYPE_ACTIVITY - tqTypeNexus = enumspb.TASK_QUEUE_TYPE_NEXUS - vbUnspecified = enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED - vbPinned = enumspb.VERSIONING_BEHAVIOR_PINNED - vbUnpinned = enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE - ver3MinPollTime = common.MinLongPollTimeout + time.Millisecond*200 + tqTypeWf = enumspb.TASK_QUEUE_TYPE_WORKFLOW + tqTypeAct = enumspb.TASK_QUEUE_TYPE_ACTIVITY + tqTypeNexus = enumspb.TASK_QUEUE_TYPE_NEXUS + vbUnspecified = enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED + vbPinned = enumspb.VERSIONING_BEHAVIOR_PINNED + vbUnpinned = enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE + ver3MinPollTime = common.MinLongPollTimeout + time.Millisecond*200 + ver3PollTimeout = 2 * time.Minute + ver3RPCTimeout = 10 * time.Second + ver3RetryPollTimeout = 21 * time.Second versionStatusNil = versionStatus(0) versionStatusInactive = versionStatus(1) @@ -79,12 +83,13 @@ type Versioning3Suite struct { } func TestVersioning3FunctionalSuite(t *testing.T) { - testcore.UseSuiteScopedCluster(t) //nolint:staticcheck // SA1019: suite still requires legacy sequential execution - parallelsuite.RunLegacySequential(t, &Versioning3Suite{}) //nolint:staticcheck // SA1019: suite still requires legacy sequential execution + testcore.UseSuiteScopedClusters(t, testcore.DefaultSuiteClusterPoolSize()) + parallelsuite.Run(t, &Versioning3Suite{}) } func (s *Versioning3Suite) setupEnv(opts ...testcore.TestOption) *testcore.TestEnv { opts = append([]testcore.TestOption{ + testcore.WithWorkerService("worker deployment manager workflows"), testcore.WithDynamicConfig(dynamicconfig.MatchingDeploymentWorkflowVersion, int(versioning3DeploymentWorkflowVersion)), // Make sure we don't hit the rate limiter in tests @@ -106,7 +111,7 @@ func (s *Versioning3Suite) setupEnv(opts ...testcore.TestOption) *testcore.TestE func (s *Versioning3Suite) runTestWithMatchingBehavior(testFn func(*testcore.TestEnv, *Versioning3Suite), opts ...testcore.TestOption) { for _, behavior := range testcore.AllMatchingBehaviors() { behavior := behavior - s.Run(behavior.Name(), func(s *Versioning3Suite) { + s.RunSequential(behavior.Name(), func(s *Versioning3Suite) { envOpts := append([]testcore.TestOption{}, opts...) envOpts = append(envOpts, behavior.Options()...) env := s.setupEnv(envOpts...) @@ -127,16 +132,10 @@ func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() { tv2 := tv.WithBuildIDNumber(2) go s.idlePollWorkflow(env, s.Context(), tv2, true, ver3MinPollTime, "second deployment should not receive pinned task") - // Start a versioned poller for the first version so that it registers the version in the task queue. - pollerCtx, cancelPoller := context.WithCancel(s.Context()) - go s.idlePollWorkflow(env, pollerCtx, tv, true, ver3MinPollTime, "first deployment should not receive any task. It is just creating a version in the task queue.") - - // Wait for the version to be present in the task queue + // Register the first version before starting a workflow pinned to it. + s.pollUntilRegistered(env, tv) s.validatePinnedVersionExistsInTaskQueue(env, tv) - // Cancel the poller after condition is met - cancelPoller() - s.startWorkflow(env, tv, tv.VersioningOverridePinned()) s.idlePollWorkflow(env, s.Context(), tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task") @@ -323,14 +322,15 @@ func (s *Versioning3Suite) testWorkflowWithPinnedOverride(env *testcore.TestEnv, s.verifyWorkflowStickyQueue(env, tv.WithRunID(runID)) } - env.WaitForChannel(actCompleted) - s.verifyWorkflowVersioning(env, tv, vbUnpinned, tv.Deployment(), tv.VersioningOverridePinned(), nil) - - s.pollWftAndHandle(env, tv, sticky, nil, + finalWFTCompleted := make(chan struct{}) + s.pollWftAndHandle(env, tv, sticky, finalWFTCompleted, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) return respondCompleteWorkflow(tv, vbUnpinned), nil }) + + env.WaitForChannel(actCompleted) + env.WaitForChannel(finalWFTCompleted) s.verifyWorkflowVersioning(env, tv, vbUnpinned, tv.Deployment(), tv.VersioningOverridePinned(), nil) } @@ -378,13 +378,8 @@ func (s *Versioning3Suite) testPinnedQueryDrainedVersion(env *testcore.TestEnv, tv := env.Tv() // create version v1 and make it current - idlePollerDone := make(chan struct{}) - go func() { - s.idlePollWorkflow(env, s.Context(), tv, true, ver3MinPollTime, "should not have gotten any tasks since there are none") - close(idlePollerDone) - }() + s.pollUntilRegistered(env, tv) s.setCurrentDeployment(env, tv) - env.WaitForChannel(idlePollerDone) wftCompleted := make(chan struct{}) s.pollWftAndHandle(env, tv, false, wftCompleted, @@ -398,24 +393,28 @@ func (s *Versioning3Suite) testPinnedQueryDrainedVersion(env *testcore.TestEnv, s.verifyWorkflowVersioning(env, tv, vbPinned, tv.Deployment(), tv.VersioningOverridePinned(), nil) // create version v2 and make it current which shall make v1 go from current -> draining/drained - idlePollerDone = make(chan struct{}) tv2 := tv.WithBuildIDNumber(2) - go func() { - s.idlePollWorkflow(env, s.Context(), tv2, true, ver3MinPollTime, "should not have gotten any tasks since there are none") - close(idlePollerDone) - }() + s.pollUntilRegistered(env, tv2) s.setCurrentDeployment(env, tv2) - env.WaitForChannel(idlePollerDone) // wait for v1 to become drained - s.Await(func(s *Versioning3Suite) { - resp, err := env.FrontendClient().DescribeWorkerDeploymentVersion(s.Context(), &workflowservice.DescribeWorkerDeploymentVersionRequest{ + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), ver3RPCTimeout) + defer cancel() + + resp, err := env.FrontendClient().DescribeWorkerDeploymentVersion(ctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{ Namespace: env.Namespace().String(), Version: tv.DeploymentVersionString(), }) - s.NoError(err) - s.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus()) - }, time.Second*10, time.Millisecond*1000) + s.NoError(err, "DescribeWorkerDeploymentVersion failed: version=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentVersionString(), ctx.Err(), s.Context().Err()) + actual := resp.GetWorkerDeploymentVersionInfo().GetDrainageInfo().GetStatus() + s.Equal(enumspb.VERSION_DRAINAGE_STATUS_DRAINED, actual, + "worker deployment version drainage status mismatch: version=%s info=%v", + tv.DeploymentVersionString(), resp.GetWorkerDeploymentVersionInfo()) + }, 90*time.Second, 500*time.Millisecond, + "wait for worker deployment version to drain: namespace=%s version=%s workflow_id=%s", + env.Namespace(), tv.DeploymentVersionString(), tv.WorkflowID()) if !pollersPresent { // simulate the pollers going away, which should make the query fail as now the version is drained + has no pollers polling it @@ -425,9 +424,14 @@ func (s *Versioning3Suite) testPinnedQueryDrainedVersion(env *testcore.TestEnv, versionStr = worker_versioning.ExternalWorkerDeploymentVersionToString(worker_versioning.ExternalWorkerDeploymentVersionFromDeployment(tv.Deployment())) } - _, err := s.queryWorkflow(env, tv) - s.Error(err) - s.ErrorContains(err, fmt.Sprintf(matching.ErrBlackholedQuery, versionStr, versionStr)) + s.Awaitf(func(s *Versioning3Suite) { + _, err := s.queryWorkflow(env, tv) + s.ErrorContains(err, fmt.Sprintf(matching.ErrBlackholedQuery, versionStr, versionStr), + "query did not return drained-version blackhole error: version=%s actual_err=%v await_ctx_err=%v", + versionStr, err, s.Context().Err()) + }, 30*time.Second, 500*time.Millisecond, + "wait for pinned query to report drained version: namespace=%s workflow_id=%s version=%s", + env.Namespace(), tv.WorkflowID(), versionStr) } else { // since the version still has pollers, the query should succeed s.pollAndQueryWorkflow(env, tv, false) @@ -438,14 +442,23 @@ func (s *Versioning3Suite) testPinnedQueryDrainedVersion(env *testcore.TestEnv, s.setRampingDeployment(env, tv, 50, false) // wait for v1 to become ramping - s.Await(func(s *Versioning3Suite) { - resp, err := env.FrontendClient().DescribeWorkerDeploymentVersion(s.Context(), &workflowservice.DescribeWorkerDeploymentVersionRequest{ + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), ver3RPCTimeout) + defer cancel() + + resp, err := env.FrontendClient().DescribeWorkerDeploymentVersion(ctx, &workflowservice.DescribeWorkerDeploymentVersionRequest{ Namespace: env.Namespace().String(), Version: tv.DeploymentVersionString(), }) - s.NoError(err) - s.Equal(enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_RAMPING, resp.GetWorkerDeploymentVersionInfo().GetStatus()) - }, time.Second*10, time.Millisecond*1000) + s.NoError(err, "DescribeWorkerDeploymentVersion failed: version=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentVersionString(), ctx.Err(), s.Context().Err()) + actual := resp.GetWorkerDeploymentVersionInfo().GetStatus() + s.Equal(enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_RAMPING, actual, + "worker deployment version status mismatch: version=%s info=%v", + tv.DeploymentVersionString(), resp.GetWorkerDeploymentVersionInfo()) + }, 90*time.Second, 500*time.Millisecond, + "wait for worker deployment version to ramp: namespace=%s version=%s workflow_id=%s", + env.Namespace(), tv.DeploymentVersionString(), tv.WorkflowID()) // the ramping status is propagated to the task queues s.waitForDeploymentDataPropagation(env, tv, versionStatusRamping, false, tqTypeWf) @@ -470,17 +483,7 @@ func (s *Versioning3Suite) testQueryWithPinnedOverride(env *testcore.TestEnv, st return respondEmptyWft(tv, sticky, vbUnpinned), nil }) - // Wait for the version to be present in the task queue. Version existence is required before it can be set as an override. - s.Await(func(s *Versioning3Suite) { - resp, err := env.GetTestCluster().MatchingClient().CheckTaskQueueVersionMembership(s.Context(), &matchingservice.CheckTaskQueueVersionMembershipRequest{ - NamespaceId: env.NamespaceID().String(), - TaskQueue: tv.TaskQueue().GetName(), - TaskQueueType: tqTypeWf, - Version: worker_versioning.DeploymentVersionFromDeployment(tv.Deployment()), - }) - s.NoError(err) - s.True(resp.GetIsMember()) - }, 10*time.Second, 500*time.Millisecond) + s.validatePinnedVersionExistsInTaskQueue(env, tv) runID := s.startWorkflow(env, tv, tv.VersioningOverridePinned()) @@ -598,7 +601,7 @@ func (s *Versioning3Suite) testPinnedWorkflowWithLateActivityPoller(env *testcor // When the first activity poller arrives from this deployment, it registers the TQ in the // deployment and that will trigger reevaluation of backlog queue. - s.pollActivityAndHandle(env, tv, nil, + s.pollActivityAndHandleEventually(env, tv, func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { s.NotNil(task) return respondActivity(), nil @@ -659,14 +662,15 @@ func (s *Versioning3Suite) testUnpinnedWorkflow(env *testcore.TestEnv, sticky bo s.verifyWorkflowStickyQueue(env, tv.WithRunID(runID)) } - env.WaitForChannel(actCompleted) - s.verifyWorkflowVersioning(env, tv, vbUnpinned, tv.Deployment(), nil, nil) - - s.pollWftAndHandle(env, tv, sticky, nil, + finalWFTCompleted := make(chan struct{}) + s.pollWftAndHandle(env, tv, sticky, finalWFTCompleted, func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.NotNil(task) return respondCompleteWorkflow(tv, vbUnpinned), nil }) + + env.WaitForChannel(actCompleted) + env.WaitForChannel(finalWFTCompleted) s.verifyWorkflowVersioning(env, tv, vbUnpinned, tv.Deployment(), nil, nil) } @@ -1211,7 +1215,7 @@ func (s *Versioning3Suite) testUnpinnedWorkflowWithRamp(env *testcore.TestEnv, t // wait until all task queue partitions know that tv2 is ramping s.waitForDeploymentDataPropagation(env, tv2, versionStatusRamping, toUnversioned, tqTypeWf, tqTypeAct) - numTests := 50 + numTests := 20 counter := make(map[string]int) runs := make([]sdkclient.WorkflowRun, numTests) for i := range numTests { @@ -1408,7 +1412,7 @@ func (s *Versioning3Suite) testDoubleTransition(unversionedSrc bool, signal bool Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING, }}, []string{}, tqTypeWf, tqTypeAct) // poll activity from v2 worker, this should start a transition but should not immediately start the activity. - go s.idlePollActivity(env, tv2, true, time.Minute, "v2 worker should not receive the activity") + go s.idlePollActivity(s.Context(), env, tv2, true, time.Minute, "v2 worker should not receive the activity") s.Await(func(s *Versioning3Suite) { dwf, err := env.FrontendClient().DescribeWorkflowExecution( @@ -1421,7 +1425,7 @@ func (s *Versioning3Suite) testDoubleTransition(unversionedSrc bool, signal bool ) s.NoError(err) s.Equal(tv2.DeploymentVersionTransition(), dwf.WorkflowExecutionInfo.GetVersioningInfo().GetVersionTransition()) - }, 10*time.Second, 500*time.Millisecond) + }, 30*time.Second, 500*time.Millisecond) // Back to sourceV if unversionedSrc { @@ -1714,7 +1718,7 @@ func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { time.Sleep(time.Millisecond * 200) //nolint:forbidigo // Pollers of d1 are there, but should not get any task - go s.idlePollActivity(env, tv1, true, ver3MinPollTime, "activities should not go to the old deployment") + go s.idlePollActivity(s.Context(), env, tv1, true, ver3MinPollTime, "activities should not go to the old deployment") act2To4Err := make(chan error, 1) go func() { @@ -1909,16 +1913,13 @@ func (s *Versioning3Suite) testChildWorkflowInheritanceExpectInherit(crossTq boo } wfStarted := make(chan struct{}, 1) - currentChanged := make(chan struct{}, 1) childv1 := func(ctx workflow.Context) (string, error) { - s.verifyWorkflowVersioning(env, tv1Child, vbPinned, tv1Child.Deployment(), override, nil) return "v1", nil } wf1 := func(ctx workflow.Context) (string, error) { wfStarted <- struct{}{} - // wait for current version to change - <-currentChanged + workflow.GetSignalChannel(ctx, "currentVersionChanged").Receive(ctx, nil) // run two child workflows fut1 := workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ @@ -1928,7 +1929,6 @@ func (s *Versioning3Suite) testChildWorkflowInheritanceExpectInherit(crossTq boo var val1 string s.NoError(fut1.Get(ctx, &val1)) - s.verifyWorkflowVersioning(env, tv1, parentRegistrationBehavior, tv1.Deployment(), override, nil) return val1, nil } @@ -1963,12 +1963,13 @@ func (s *Versioning3Suite) testChildWorkflowInheritanceExpectInherit(crossTq boo // v1 is current for both parent and child s.setCurrentDeployment(env, tv1) + s.pollUntilRegistered(env, tv1Child) startOpts := sdkclient.StartWorkflowOptions{ ID: tv1.WorkflowID(), TaskQueue: tv1.TaskQueue().GetName(), VersioningOverride: nil, - WorkflowTaskTimeout: 10 * time.Second, + WorkflowTaskTimeout: 30 * time.Second, } if withOverride { startOpts.VersioningOverride = &sdkclient.PinnedVersioningOverride{ @@ -1980,29 +1981,44 @@ func (s *Versioning3Suite) testChildWorkflowInheritanceExpectInherit(crossTq boo // wait for it to start on v1 env.WaitForChannel(wfStarted) close(wfStarted) // force panic if replayed + s.verifyWorkflowVersioning(env, tv1, parentRegistrationBehavior, tv1.Deployment(), override, nil) // make v2 current for both parent and child and unblock the wf to start the child s.updateTaskQueueDeploymentDataWithRoutingConfig(env, tv2, &deploymentpb.RoutingConfig{ CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv2.DeploymentVersionString()), CurrentVersionChangedTime: timestamp.TimePtr(time.Now()), RevisionNumber: 2, - }, map[string]*deploymentspb.WorkerDeploymentVersionData{tv2.DeploymentVersion().GetBuildId(): { - Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, - }}, []string{}, tqTypeWf) + }, map[string]*deploymentspb.WorkerDeploymentVersionData{ + tv2.DeploymentVersion().GetBuildId(): { + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, + }, + tv1.DeploymentVersion().GetBuildId(): { + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING, + }, + }, []string{}, tqTypeWf) + s.pollUntilRegistered(env, tv1) if crossTq { s.updateTaskQueueDeploymentDataWithRoutingConfig(env, tv2Child, &deploymentpb.RoutingConfig{ CurrentDeploymentVersion: worker_versioning.ExternalWorkerDeploymentVersionFromStringV31(tv2Child.DeploymentVersionString()), CurrentVersionChangedTime: timestamp.TimePtr(time.Now()), RevisionNumber: 2, - }, map[string]*deploymentspb.WorkerDeploymentVersionData{tv2Child.DeploymentVersion().GetBuildId(): { - Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, - }}, []string{}, tqTypeWf) + }, map[string]*deploymentspb.WorkerDeploymentVersionData{ + tv2Child.DeploymentVersion().GetBuildId(): { + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_CURRENT, + }, + tv1Child.DeploymentVersion().GetBuildId(): { + Status: enumspb.WORKER_DEPLOYMENT_VERSION_STATUS_DRAINING, + }, + }, []string{}, tqTypeWf) + s.pollUntilRegistered(env, tv1Child) } - currentChanged <- struct{}{} + s.NoError(env.SdkClient().SignalWorkflow(s.Context(), run.GetID(), run.GetRunID(), "currentVersionChanged", nil)) var out string s.NoError(run.Get(s.Context(), &out)) s.Equal("v1", out) + s.verifyWorkflowVersioning(env, tv1, parentRegistrationBehavior, tv1.Deployment(), override, nil) + s.verifyWorkflowVersioning(env, tv1Child, vbPinned, tv1Child.Deployment(), override, nil) } func (s *Versioning3Suite) TestChildWorkflowInheritance_UnpinnedParent() { @@ -2078,6 +2094,7 @@ func (s *Versioning3Suite) testChildWorkflowInheritanceExpectNoInherit(crossTq b Namespace: env.Namespace().String(), }) s.NoError(err) + defer sdkClient.Close() w1 := worker.New(sdkClient, tv1.TaskQueue().GetName(), worker.Options{ DeploymentOptions: worker.DeploymentOptions{ @@ -2309,7 +2326,7 @@ func (s *Versioning3Suite) testPinnedCaNUpgradeOnCaN(normalTask, speculativeTask // Mode-specific validations historyEvents := task.History.GetEvents() if speculativeTask { - s.verifySpeculativeTask(env, execution) + s.verifySpeculativeTask(task) } else if transientTask { s.verifyTransientTask(task) // Get events from server-side history, this includes transient events. @@ -2480,7 +2497,7 @@ func (s *Versioning3Suite) testPinnedCaNUseRampingVersionOnCaN(pinnedOverride, n if !noRampingVersion { // Register v2 poller before setting it as ramping - s.idlePollWorkflow(env, s.Context(), tv2, true, ver3MinPollTime, "should not get any tasks yet") + s.pollUntilRegistered(env, tv2) // Set v2 as ramping at 0%: no workflows move via hash, only via UseRampingVersion CaN s.setRampingDeployment(env, tv2, 0, false) s.waitForDeploymentDataPropagation(env, tv2, versionStatusRamping, false, tqTypeWf) @@ -2566,7 +2583,7 @@ func (s *Versioning3Suite) TestPinnedCaN_UseRampingVersionOnCaN_SubsequentWFTGoe func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { return respondEmptyWft(tv1, false, vbPinned), nil }) - s.idlePollWorkflow(env, s.Context(), tv2, true, ver3MinPollTime, "should not get any tasks yet") + s.pollUntilRegistered(env, tv2) s.setRampingDeployment(env, tv2, 0, false) s.waitForDeploymentDataPropagation(env, tv2, versionStatusRamping, false, tqTypeWf) s.triggerNormalWFT(env, tv1, execution) @@ -2663,7 +2680,7 @@ func (s *Versioning3Suite) TestPinnedCaN_UseRampingVersionOnCaN_RetryInheritsIni func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { return respondEmptyWft(tv1, false, vbPinned), nil }) - s.idlePollWorkflow(env, s.Context(), tv2, true, ver3MinPollTime, "should not get any tasks yet") + s.pollUntilRegistered(env, tv2) s.setRampingDeployment(env, tv2, 0, false) s.waitForDeploymentDataPropagation(env, tv2, versionStatusRamping, false, tqTypeWf) s.triggerNormalWFT(env, tv1, execution) @@ -2740,7 +2757,7 @@ func (s *Versioning3Suite) TestPinnedCaN_UseRampingVersionOnCaN_ChildDoesNotInhe func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { return respondEmptyWft(tv1, false, vbPinned), nil }) - s.idlePollWorkflow(env, s.Context(), tv2, true, ver3MinPollTime, "should not get any tasks yet") + s.pollUntilRegistered(env, tv2) s.setRampingDeployment(env, tv2, 0, false) s.waitForDeploymentDataPropagation(env, tv2, versionStatusRamping, false, tqTypeWf) s.triggerNormalWFT(env, tv1, execution) @@ -2853,9 +2870,8 @@ func (s *Versioning3Suite) triggerTransientWFT(env *testcore.TestEnv, tv *testva } // Verify this is a speculative task - events not yet in persisted history -func (s *Versioning3Suite) verifySpeculativeTask(env *testcore.TestEnv, execution *commonpb.WorkflowExecution) { - events := env.GetHistory(env.Namespace().String(), execution) - s.EqualHistoryEvents(` +func (s *Versioning3Suite) verifySpeculativeTask(task *workflowservice.PollWorkflowTaskQueueResponse) { + s.EqualHistory(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled 3 WorkflowTaskStarted @@ -2866,7 +2882,7 @@ func (s *Versioning3Suite) verifySpeculativeTask(env *testcore.TestEnv, executio 8 WorkflowTaskCompleted 9 WorkflowTaskScheduled 10 WorkflowTaskStarted - `, events) + `, task.History) } func (s *Versioning3Suite) verifyTransientTask(task *workflowservice.PollWorkflowTaskQueueResponse) { @@ -3021,7 +3037,6 @@ func (s *Versioning3Suite) testCan(crossTq bool, behavior enumspb.VersioningBeha } wfStarted := make(chan struct{}, 10) - currentChanged := make(chan struct{}, 10) wf1 := func(ctx workflow.Context, attempt int) (string, error) { switch attempt { @@ -3040,8 +3055,7 @@ func (s *Versioning3Suite) testCan(crossTq bool, behavior enumspb.VersioningBeha s.verifyWorkflowVersioning(env, tv1, vbUnspecified, nil, override, tv1.DeploymentVersionTransition()) } wfStarted <- struct{}{} - // wait for current version to change. - <-currentChanged + workflow.GetSignalChannel(ctx, "currentVersionChanged").Receive(ctx, nil) return "", workflow.NewContinueAsNewError(newCtx, "wf", attempt+1) case 1: s.verifyWorkflowVersioning(env, tv1, vbPinned, tv1.Deployment(), override, nil) @@ -3069,6 +3083,7 @@ func (s *Versioning3Suite) testCan(crossTq bool, behavior enumspb.VersioningBeha Namespace: env.Namespace().String(), }) s.NoError(err) + defer sdkClient.Close() if crossTq && expectPinnedInherit { w1xtq := worker.New(sdkClient, canxTq, worker.Options{ @@ -3148,7 +3163,7 @@ func (s *Versioning3Suite) testCan(crossTq bool, behavior enumspb.VersioningBeha // make v2 current s.setCurrentDeployment(env, tv2) - currentChanged <- struct{}{} + s.NoError(sdkClient.SignalWorkflow(s.Context(), run.GetID(), run.GetRunID(), "currentVersionChanged", nil)) var out string s.NoError(run.Get(s.Context(), &out)) @@ -3429,23 +3444,29 @@ func (s *Versioning3Suite) TestSyncDeploymentUserDataWithRoutingConfig_Update() } func (s *Versioning3Suite) setCurrentDeployment(env *testcore.TestEnv, tv *testvars.TestVars) { - ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second) - defer cancel() - failedPrecondition := serviceerror.NewFailedPreconditionf(workerdeployment.ErrCurrentVersionDoesNotHaveAllTaskQueues, tv.DeploymentVersionStringV32()).Error() - s.Await(func(s *Versioning3Suite) { + buildIDNotFound := fmt.Sprintf("build ID '%s' not found in Worker Deployment", tv.BuildID()) + deploymentNotFound := fmt.Sprintf("no Worker Deployment found with name '%s'", tv.DeploymentSeries()) + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), ver3RPCTimeout) + defer cancel() + req := &workflowservice.SetWorkerDeploymentCurrentVersionRequest{ Namespace: env.Namespace().String(), DeploymentName: tv.DeploymentSeries(), } req.BuildId = tv.BuildID() _, err := env.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, req) - if s.shouldRetryWorkerDeploymentRPC(env, err, failedPrecondition) { - s.NoError(err) + if s.shouldRetryWorkerDeploymentRPC(env, err, failedPrecondition, buildIDNotFound, deploymentNotFound) { + s.NoError(err, "retryable SetWorkerDeploymentCurrentVersion failure: deployment=%s build_id=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentSeries(), tv.BuildID(), ctx.Err(), s.Context().Err()) return } - s.NoError(err) - }, 60*time.Second, 500*time.Millisecond) + s.NoError(err, "SetWorkerDeploymentCurrentVersion failed: deployment=%s build_id=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentSeries(), tv.BuildID(), ctx.Err(), s.Context().Err()) + }, 90*time.Second, 500*time.Millisecond, + "set current worker deployment: namespace=%s deployment=%s build_id=%s version=%s", + env.Namespace(), tv.DeploymentSeries(), tv.BuildID(), tv.DeploymentVersionString()) // Wait for propagation to complete since we have tests using async entity workflows to set the current version s.waitForDeploymentDataPropagationQueryWorkerDeployment(env, tv) @@ -3455,71 +3476,97 @@ func (s *Versioning3Suite) setCurrentDeployment(env *testcore.TestEnv, tv *testv // tqTypes controls which task queue types to poll; it defaults to workflow only. // Pollers run continuously until all TQ types are registered. func (s *Versioning3Suite) pollUntilRegistered(env *testcore.TestEnv, tv *testvars.TestVars, tqTypes ...enumspb.TaskQueueType) { + stopPollers := s.startRegistrationPollers(env, tv, tqTypes...) + defer stopPollers() + + s.waitForDeploymentVersionRegistration(env, tv, tqTypes...) +} + +func (s *Versioning3Suite) startRegistrationPollers(env *testcore.TestEnv, tv *testvars.TestVars, tqTypes ...enumspb.TaskQueueType) func() { if len(tqTypes) == 0 { tqTypes = []enumspb.TaskQueueType{tqTypeWf} } pollCtx, cancel := context.WithCancel(s.Context()) + var wg sync.WaitGroup for _, tqType := range tqTypes { tqType := tqType - go func() { + wg.Go(func() { for pollCtx.Err() == nil { switch tqType { case tqTypeWf: s.idlePollWorkflow(env, pollCtx, tv, true, ver3MinPollTime, "should not get any tasks yet") case tqTypeAct: - s.idlePollActivity(env, tv, true, ver3MinPollTime, "should not get any tasks yet") + s.idlePollActivity(pollCtx, env, tv, true, ver3MinPollTime, "should not get any tasks yet") case tqTypeNexus: s.idlePollNexus(env, pollCtx, tv, true, ver3MinPollTime, "should not get any tasks yet") default: panic("invalid task queue type") } } - }() + }) } - // Wait until the version is visible and all requested task queue types are registered. - s.Await(func(s *Versioning3Suite) { - resp, err := env.FrontendClient().DescribeWorkerDeploymentVersion(s.Context(), &workflowservice.DescribeWorkerDeploymentVersionRequest{ - Namespace: env.Namespace().String(), - Version: tv.DeploymentVersionString(), - }) - var notFound *serviceerror.NotFound - if errors.As(err, ¬Found) { - s.NoError(err) - return + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + return func() { + cancel() + select { + case <-done: + case <-s.Context().Done(): + s.FailNow("context timeout while stopping registration pollers") } - s.NoError(err) - tqName := tv.TaskQueue().GetName() + } +} + +func (s *Versioning3Suite) waitForDeploymentVersionRegistration(env *testcore.TestEnv, tv *testvars.TestVars, tqTypes ...enumspb.TaskQueueType) { + if len(tqTypes) == 0 { + tqTypes = []enumspb.TaskQueueType{tqTypeWf} + } + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), 30*time.Second) + defer cancel() + for _, tqType := range tqTypes { - found := false - for _, tq := range resp.GetVersionTaskQueues() { - if tq.GetName() == tqName && tq.GetType() == tqType { - found = true - break - } - } - s.True(found) + resp, err := env.GetTestCluster().MatchingClient().CheckTaskQueueVersionMembership(ctx, &matchingservice.CheckTaskQueueVersionMembershipRequest{ + NamespaceId: env.NamespaceID().String(), + TaskQueue: tv.TaskQueue().GetName(), + TaskQueueType: tqType, + Version: worker_versioning.DeploymentVersionFromDeployment(tv.Deployment()), + }) + s.NoError(err, "CheckTaskQueueVersionMembership failed: task_queue=%s type=%s version=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.TaskQueue().GetName(), tqType, tv.DeploymentVersionString(), ctx.Err(), s.Context().Err()) + s.True(resp.GetIsMember(), + "task queue version membership not observed: task_queue=%s type=%s version=%s response=%v", + tv.TaskQueue().GetName(), tqType, tv.DeploymentVersionString(), resp) } - }, 30*time.Second, 500*time.Millisecond) - cancel() + }, 90*time.Second, 500*time.Millisecond, + "wait for deployment version registration: namespace=%s task_queue=%s version=%s tq_types=%v", + env.Namespace(), tv.TaskQueue().GetName(), tv.DeploymentVersionString(), tqTypes) } func (s *Versioning3Suite) unsetCurrentDeployment(env *testcore.TestEnv, tv *testvars.TestVars) { - ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second) - defer cancel() + deploymentNotFound := fmt.Sprintf("no Worker Deployment found with name '%s'", tv.DeploymentSeries()) + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), 30*time.Second) + defer cancel() - s.Await(func(s *Versioning3Suite) { req := &workflowservice.SetWorkerDeploymentCurrentVersionRequest{ Namespace: env.Namespace().String(), DeploymentName: tv.DeploymentSeries(), } _, err := env.FrontendClient().SetWorkerDeploymentCurrentVersion(ctx, req) - if s.shouldRetryWorkerDeploymentRPC(env, err) { + if s.shouldRetryWorkerDeploymentRPC(env, err, deploymentNotFound) { s.NoError(err) return } s.NoError(err) - }, 60*time.Second, 500*time.Millisecond) + }, 90*time.Second, 500*time.Millisecond, + "unset current worker deployment: namespace=%s deployment=%s version=%s", + env.Namespace(), tv.DeploymentSeries(), tv.DeploymentVersionString()) // Wait for propagation to complete since we have tests using async entity workflows to set the current version s.waitForDeploymentDataPropagationQueryWorkerDeployment(env, tv) @@ -3532,16 +3579,18 @@ func (s *Versioning3Suite) setRampingDeployment( percentage float32, rampUnversioned bool, ) { - ctx, cancel := context.WithTimeout(s.Context(), 60*time.Second) - defer cancel() - bid := tv.BuildID() if rampUnversioned { bid = "" } failedPrecondition := serviceerror.NewFailedPreconditionf(workerdeployment.ErrRampingVersionDoesNotHaveAllTaskQueues, tv.DeploymentVersionStringV32()).Error() + buildIDNotFound := fmt.Sprintf("build ID '%s' not found in Worker Deployment", tv.BuildID()) + deploymentNotFound := fmt.Sprintf("no Worker Deployment found with name '%s'", tv.DeploymentSeries()) + + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), 30*time.Second) + defer cancel() - s.Await(func(s *Versioning3Suite) { req := &workflowservice.SetWorkerDeploymentRampingVersionRequest{ Namespace: env.Namespace().String(), DeploymentName: tv.DeploymentSeries(), @@ -3549,12 +3598,16 @@ func (s *Versioning3Suite) setRampingDeployment( } req.BuildId = bid _, err := env.FrontendClient().SetWorkerDeploymentRampingVersion(ctx, req) - if s.shouldRetryWorkerDeploymentRPC(env, err, failedPrecondition) { - s.NoError(err) + if s.shouldRetryWorkerDeploymentRPC(env, err, failedPrecondition, buildIDNotFound, deploymentNotFound) { + s.NoError(err, "retryable SetWorkerDeploymentRampingVersion failure: deployment=%s build_id=%s percentage=%v rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentSeries(), bid, percentage, ctx.Err(), s.Context().Err()) return } - s.NoError(err) - }, 60*time.Second, 500*time.Millisecond) + s.NoError(err, "SetWorkerDeploymentRampingVersion failed: deployment=%s build_id=%s percentage=%v rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentSeries(), bid, percentage, ctx.Err(), s.Context().Err()) + }, 90*time.Second, 500*time.Millisecond, + "set ramping worker deployment: namespace=%s deployment=%s build_id=%s version=%s percentage=%v ramp_unversioned=%v", + env.Namespace(), tv.DeploymentSeries(), bid, tv.DeploymentVersionString(), percentage, rampUnversioned) // Wait for propagation to complete since we have tests using async entity workflows to set the current version s.waitForDeploymentDataPropagationQueryWorkerDeployment(env, tv) @@ -3562,18 +3615,28 @@ func (s *Versioning3Suite) setRampingDeployment( func (s *Versioning3Suite) waitForDeploymentDataPropagationQueryWorkerDeployment(env *testcore.TestEnv, tv *testvars.TestVars) { if versioning3DeploymentWorkflowVersion == workerdeployment.AsyncSetCurrentAndRamping { - s.Await(func(s *Versioning3Suite) { - resp, err := env.FrontendClient().DescribeWorkerDeployment(s.Context(), &workflowservice.DescribeWorkerDeploymentRequest{ + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), 30*time.Second) + defer cancel() + + resp, err := env.FrontendClient().DescribeWorkerDeployment(ctx, &workflowservice.DescribeWorkerDeploymentRequest{ Namespace: env.Namespace().String(), DeploymentName: tv.DeploymentSeries(), }) if s.shouldRetryWorkerDeploymentRPC(env, err) { - s.NoError(err) + s.NoError(err, "retryable DescribeWorkerDeployment failure: deployment=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentSeries(), ctx.Err(), s.Context().Err()) return } - s.NoError(err) - s.Equal(enumspb.ROUTING_CONFIG_UPDATE_STATE_COMPLETED, resp.GetWorkerDeploymentInfo().GetRoutingConfigUpdateState()) - }, 10*time.Second, 500*time.Millisecond) + s.NoError(err, "DescribeWorkerDeployment failed: deployment=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.DeploymentSeries(), ctx.Err(), s.Context().Err()) + actual := resp.GetWorkerDeploymentInfo().GetRoutingConfigUpdateState() + s.Equal(enumspb.ROUTING_CONFIG_UPDATE_STATE_COMPLETED, actual, + "worker deployment routing config update not complete: deployment=%s info=%v", + tv.DeploymentSeries(), resp.GetWorkerDeploymentInfo()) + }, 90*time.Second, 500*time.Millisecond, + "wait for worker deployment routing config propagation: namespace=%s deployment=%s version=%s", + env.Namespace(), tv.DeploymentSeries(), tv.DeploymentVersionString()) } } @@ -3726,7 +3789,7 @@ func (s *Versioning3Suite) rollbackTaskQueueToVersion( current, currentRevisionNumber, _, _, _, _, _, _ := worker_versioning.CalculateTaskQueueVersioningInfo(ms.GetUserData().GetData().GetPerType()[int32(tqTypeWf)].GetDeploymentData()) s.Equal(tv.DeploymentVersion().GetBuildId(), current.GetBuildId()) s.Equal(int64(0), currentRevisionNumber) - }, 10*time.Second, 500*time.Millisecond) + }, 30*time.Second, 500*time.Millisecond) } func (s *Versioning3Suite) syncTaskQueueDeploymentData( @@ -3831,50 +3894,62 @@ func (s *Versioning3Suite) verifyWorkflowVersioning(env *testcore.TestEnv, override *workflowpb.VersioningOverride, transition *workflowpb.DeploymentVersionTransition, ) { - dwf, err := env.FrontendClient().DescribeWorkflowExecution( - s.Context(), &workflowservice.DescribeWorkflowExecutionRequest{ - Namespace: env.Namespace().String(), - Execution: &commonpb.WorkflowExecution{ - WorkflowId: tv.WorkflowID(), - }, - }) - s.NoError(err) + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), ver3RPCTimeout) + defer cancel() - versioningInfo := dwf.WorkflowExecutionInfo.GetVersioningInfo() - s.Equal(behavior.String(), versioningInfo.GetBehavior().String()) - var v *deploymentspb.WorkerDeploymentVersion - if versioningInfo.GetVersion() != "" { //nolint:staticcheck // SA1019: worker versioning v0.31 - //nolint:staticcheck // SA1019: worker versioning v0.31 - v, err = worker_versioning.WorkerDeploymentVersionFromStringV31(versioningInfo.GetVersion()) - s.NoError(err) - s.NotNil(versioningInfo.GetDeploymentVersion()) // make sure we are always populating this whenever Version string is populated - } - if dv := versioningInfo.GetDeploymentVersion(); dv != nil { - v = worker_versioning.DeploymentVersionFromDeployment(worker_versioning.DeploymentFromExternalDeploymentVersion(dv)) - } - actualDeployment := worker_versioning.DeploymentFromDeploymentVersion(v) - if !deployment.Equal(actualDeployment) { - s.Fail(fmt.Sprintf("deployment version mismatch. expected: {%s}, actual: {%s}", - deployment, - actualDeployment, - )) - } + dwf, err := env.FrontendClient().DescribeWorkflowExecution( + ctx, &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + }, + }) + s.NoError(err, "DescribeWorkflowExecution failed: workflow_id=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.WorkflowID(), ctx.Err(), s.Context().Err()) + + versioningInfo := dwf.WorkflowExecutionInfo.GetVersioningInfo() + s.Equal(behavior.String(), versioningInfo.GetBehavior().String(), + "workflow versioning behavior mismatch: workflow_id=%s versioning_info=%v execution_info=%v", + tv.WorkflowID(), versioningInfo, dwf.WorkflowExecutionInfo) + var v *deploymentspb.WorkerDeploymentVersion + if versioningInfo.GetVersion() != "" { //nolint:staticcheck // SA1019: worker versioning v0.31 + //nolint:staticcheck // SA1019: worker versioning v0.31 + v, err = worker_versioning.WorkerDeploymentVersionFromStringV31(versioningInfo.GetVersion()) + s.NoError(err) + s.NotNil(versioningInfo.GetDeploymentVersion()) // make sure we are always populating this whenever Version string is populated + } + if dv := versioningInfo.GetDeploymentVersion(); dv != nil { + v = worker_versioning.DeploymentVersionFromDeployment(worker_versioning.DeploymentFromExternalDeploymentVersion(dv)) + } + actualDeployment := worker_versioning.DeploymentFromDeploymentVersion(v) + if !deployment.Equal(actualDeployment) { + s.Fail(fmt.Sprintf("deployment version mismatch. expected: {%s}, actual: {%s}", + deployment, + actualDeployment, + ), "workflow_id=%s versioning_info=%v execution_info=%v", + tv.WorkflowID(), versioningInfo, dwf.WorkflowExecutionInfo) + } - // v0.32 override - s.Equal(override.GetAutoUpgrade(), versioningInfo.GetVersioningOverride().GetAutoUpgrade()) - s.Equal(override.GetPinned().GetVersion().GetBuildId(), versioningInfo.GetVersioningOverride().GetPinned().GetVersion().GetBuildId()) - s.Equal(override.GetPinned().GetVersion().GetDeploymentName(), versioningInfo.GetVersioningOverride().GetPinned().GetVersion().GetDeploymentName()) - s.Equal(override.GetPinned().GetBehavior(), versioningInfo.GetVersioningOverride().GetPinned().GetBehavior()) - if worker_versioning.OverrideIsPinned(override) { - s.Equal(override.GetPinned().GetVersion().GetDeploymentName(), dwf.WorkflowExecutionInfo.GetWorkerDeploymentName()) - } + // v0.32 override + s.Equal(override.GetAutoUpgrade(), versioningInfo.GetVersioningOverride().GetAutoUpgrade()) + s.Equal(override.GetPinned().GetVersion().GetBuildId(), versioningInfo.GetVersioningOverride().GetPinned().GetVersion().GetBuildId()) + s.Equal(override.GetPinned().GetVersion().GetDeploymentName(), versioningInfo.GetVersioningOverride().GetPinned().GetVersion().GetDeploymentName()) + s.Equal(override.GetPinned().GetBehavior(), versioningInfo.GetVersioningOverride().GetPinned().GetBehavior()) + if worker_versioning.OverrideIsPinned(override) { + s.Equal(override.GetPinned().GetVersion().GetDeploymentName(), dwf.WorkflowExecutionInfo.GetWorkerDeploymentName()) + } - if !versioningInfo.GetVersionTransition().Equal(transition) { - s.Fail(fmt.Sprintf("version transition mismatch. expected: {%s}, actual: {%s}", - transition, - versioningInfo.GetVersionTransition(), - )) - } + if !versioningInfo.GetVersionTransition().Equal(transition) { + s.Fail(fmt.Sprintf("version transition mismatch. expected: {%s}, actual: {%s}", + transition, + versioningInfo.GetVersionTransition(), + ), "workflow_id=%s versioning_info=%v execution_info=%v", + tv.WorkflowID(), versioningInfo, dwf.WorkflowExecutionInfo) + } + }, 90*time.Second, 500*time.Millisecond, + "verify workflow versioning: namespace=%s workflow_id=%s expected_behavior=%s expected_deployment=%v expected_override=%v expected_transition=%v", + env.Namespace(), tv.WorkflowID(), behavior, deployment, override, transition) } func respondActivity() *workflowservice.RespondActivityTaskCompletedRequest { @@ -4069,7 +4144,7 @@ func (s *Versioning3Suite) doPollWftAndHandle( DeploymentOptions: tv.WorkerDeploymentOptions(versioned), TaskQueue: tq, }, - ).HandleTask(tv, handler, taskpoller.WithTimeout(time.Minute)) + ).HandleTask(tv, handler, taskpoller.WithTimeout(ver3PollTimeout)) } if async == nil { resp, err := f() @@ -4102,7 +4177,7 @@ func (s *Versioning3Suite) pollWftAndHandleQueries( DeploymentOptions: tv.WorkerDeploymentOptions(true), TaskQueue: tq, }, - ).HandleLegacyQuery(tv, handler) + ).HandleLegacyQuery(tv, handler, taskpoller.WithTimeout(ver3PollTimeout)) } if async == nil { resp, err := f() @@ -4134,7 +4209,7 @@ func (s *Versioning3Suite) pollNexusTaskAndHandle( DeploymentOptions: tv.WorkerDeploymentOptions(true), TaskQueue: tq, }, - ).HandleTask(tv, handler, taskpoller.WithTimeout(10*time.Second)) + ).HandleTask(tv, handler, taskpoller.WithTimeout(ver3PollTimeout)) } if async == nil { resp, err := f() @@ -4166,6 +4241,17 @@ func (s *Versioning3Suite) pollActivityAndHandle( s.doPollActivityAndHandle(env, tv, true, async, handler) } +func (s *Versioning3Suite) pollActivityAndHandleEventually( + env *testcore.TestEnv, + tv *testvars.TestVars, + handler func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error), +) { + s.Await(func(s *Versioning3Suite) { + err := s.doPollActivityAndHandleErrWithTimeout(env, tv, true, ver3RetryPollTimeout, handler) + s.NoError(err) + }, 90*time.Second, 500*time.Millisecond) +} + func (s *Versioning3Suite) pollActivityAndHandleErr( env *testcore.TestEnv, tv *testvars.TestVars, @@ -4199,12 +4285,22 @@ func (s *Versioning3Suite) doPollActivityAndHandleErr( tv *testvars.TestVars, versioned bool, handler func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error), +) error { + return s.doPollActivityAndHandleErrWithTimeout(env, tv, versioned, ver3PollTimeout, handler) +} + +func (s *Versioning3Suite) doPollActivityAndHandleErrWithTimeout( + env *testcore.TestEnv, + tv *testvars.TestVars, + versioned bool, + timeout time.Duration, + handler func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error), ) error { poller := taskpoller.New(s.T(), env.FrontendClient(), env.Namespace().String()) _, err := poller.PollActivityTask( &workflowservice.PollActivityTaskQueueRequest{ DeploymentOptions: tv.WorkerDeploymentOptions(versioned), - }).HandleTask(tv, handler, taskpoller.WithTimeout(time.Minute)) + }).HandleTask(tv, handler, taskpoller.WithTimeout(timeout)) return err } @@ -4252,6 +4348,7 @@ func (s *Versioning3Suite) idlePollUnversionedActivity( } func (s *Versioning3Suite) idlePollActivity( + ctx context.Context, env *testcore.TestEnv, tv *testvars.TestVars, versioned bool, @@ -4272,7 +4369,7 @@ func (s *Versioning3Suite) idlePollActivity( return nil, nil }, taskpoller.WithTimeout(timeout), - taskpoller.WithContext(s.Context()), + taskpoller.WithContext(ctx), ) } @@ -4358,9 +4455,11 @@ func (s *Versioning3Suite) waitForDeploymentDataPropagation( } } f, err := tqid.NewTaskQueueFamily(env.NamespaceID().String(), tv.TaskQueue().GetName()) - s.Await(func(s *Versioning3Suite) { + s.Awaitf(func(s *Versioning3Suite) { + observed := make(map[partAndType]string, len(remaining)) for pt := range remaining { - s.NoError(err) + s.NoError(err, "NewTaskQueueFamily failed: namespace_id=%s task_queue=%s", + env.NamespaceID(), tv.TaskQueue().GetName()) partition := f.TaskQueue(pt.tp).NormalPartition(pt.part) // Use lower-level GetTaskQueueUserData instead of GetWorkerBuildIdCompatibility // here so that we can target activity queues. @@ -4371,11 +4470,14 @@ func (s *Versioning3Suite) waitForDeploymentDataPropagation( TaskQueue: partition.RpcName(), TaskQueueType: partition.TaskType(), }) - s.NoError(err) + s.NoError(err, "GetTaskQueueUserData failed: task_queue=%s partition=%d type=%s rpc_name=%s await_ctx_err=%v", + tv.TaskQueue().GetName(), pt.part, pt.tp, partition.RpcName(), s.Context().Err()) perTypes := res.GetUserData().GetData().GetPerType() if perTypes != nil { deploymentsData := perTypes[int32(pt.tp)].GetDeploymentData().GetDeploymentsData() workerDeploymentData := deploymentsData[tv.DeploymentVersion().GetDeploymentName()] + observed[pt] = fmt.Sprintf("has_per_type=true worker_data=%v deployment_data=%v", + workerDeploymentData, perTypes[int32(pt.tp)].GetDeploymentData()) if unversionedRamp { if perTypes[int32(pt.tp)].GetDeploymentData().GetUnversionedRampData() != nil { @@ -4420,10 +4522,16 @@ func (s *Versioning3Suite) waitForDeploymentDataPropagation( } } } + } else { + observed[pt] = "missing per-type deployment data" } } - s.Empty(remaining) - }, 30*time.Second, 500*time.Millisecond) + s.Empty(remaining, + "deployment data did not propagate: namespace=%s task_queue=%s version=%s expected_status=%v unversioned_ramp=%v remaining=%v observed=%v", + env.Namespace(), tv.TaskQueue().GetName(), tv.DeploymentVersionString(), status, unversionedRamp, remaining, observed) + }, 90*time.Second, 500*time.Millisecond, + "wait for task queue deployment data propagation: namespace=%s task_queue=%s version=%s expected_status=%v unversioned_ramp=%v tq_types=%v", + env.Namespace(), tv.TaskQueue().GetName(), tv.DeploymentVersionString(), status, unversionedRamp, tqTypes) } func (s *Versioning3Suite) validateBacklogCount( @@ -4432,25 +4540,31 @@ func (s *Versioning3Suite) validateBacklogCount( tqType enumspb.TaskQueueType, expectedCount int64, ) { - ctx, cancel := context.WithTimeout(s.Context(), 10*time.Second) - defer cancel() - var resp *workflowservice.DescribeTaskQueueResponse var err error - s.Await(func(s *Versioning3Suite) { + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), 10*time.Second) + defer cancel() + resp, err = env.FrontendClient().DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{ Namespace: env.Namespace().String(), TaskQueue: tv.TaskQueue(), TaskQueueType: tqType, ReportStats: true, }) - s.NoError(err) - s.NotNil(resp) + s.NoError(err, "DescribeTaskQueue failed: task_queue=%s type=%s rpc_ctx_err=%v await_ctx_err=%v", + tv.TaskQueue().GetName(), tqType, ctx.Err(), s.Context().Err()) + s.NotNil(resp, "DescribeTaskQueue returned nil response: task_queue=%s type=%s", tv.TaskQueue().GetName(), tqType) priorityStats, ok := resp.GetStatsByPriorityKey()[3] - s.True(ok) - s.Equal(expectedCount, priorityStats.GetApproximateBacklogCount()) - }, 6*time.Second, 500*time.Millisecond) + s.True(ok, "DescribeTaskQueue response missing priority 3 stats: task_queue=%s type=%s stats=%v", + tv.TaskQueue().GetName(), tqType, resp.GetStatsByPriorityKey()) + s.Equal(expectedCount, priorityStats.GetApproximateBacklogCount(), + "backlog count mismatch: task_queue=%s type=%s expected=%d stats=%v response=%v", + tv.TaskQueue().GetName(), tqType, expectedCount, priorityStats, resp) + }, 30*time.Second, 500*time.Millisecond, + "validate backlog count: namespace=%s task_queue=%s type=%s expected_count=%d", + env.Namespace(), tv.TaskQueue().GetName(), tqType, expectedCount) } func (s *Versioning3Suite) verifyVersioningSAs( @@ -4460,10 +4574,10 @@ func (s *Versioning3Suite) verifyVersioningSAs( executionStatus enumspb.WorkflowExecutionStatus, usedBuilds ...*testvars.TestVars, ) { - ctx, cancel := context.WithTimeout(s.Context(), 10*time.Second) - defer cancel() + s.Awaitf(func(s *Versioning3Suite) { + ctx, cancel := context.WithTimeout(s.Context(), ver3RPCTimeout) + defer cancel() - s.Await(func(s *Versioning3Suite) { var query string if behavior != vbUnspecified { query = fmt.Sprintf("WorkflowId = '%s' AND TemporalWorkerDeployment = '%s' AND TemporalWorkerDeploymentVersion= '%s' AND TemporalWorkflowVersioningBehavior = '%s' AND ExecutionStatus = '%s'", @@ -4476,42 +4590,49 @@ func (s *Versioning3Suite) verifyVersioningSAs( Namespace: env.Namespace().String(), Query: query, }) - s.NoError(err) - s.NotEmpty(resp.GetExecutions()) + s.NoError(err, "ListWorkflowExecutions failed: query=%q rpc_ctx_err=%v await_ctx_err=%v", + query, ctx.Err(), s.Context().Err()) + s.NotEmpty(resp.GetExecutions(), "visibility query returned no executions: query=%q response=%v", query, resp) if len(resp.GetExecutions()) > 0 { w := resp.GetExecutions()[0] if behavior == vbPinned { payload, ok := w.GetSearchAttributes().GetIndexedFields()["BuildIds"] - s.True(ok) + s.True(ok, "BuildIds search attribute missing: query=%q execution=%v", query, w) searchAttrAny, err := sadefs.DecodeValue(payload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, false) - s.NoError(err) + s.NoError(err, "failed to decode BuildIds search attribute: query=%q execution=%v", query, w) var searchAttr []string if searchAttrAny != nil { searchAttr = searchAttrAny.([]string) } if behavior == enumspb.VERSIONING_BEHAVIOR_PINNED { - s.Contains(searchAttr, worker_versioning.PinnedBuildIdSearchAttribute(tv.DeploymentVersionStringV32())) + s.Contains(searchAttr, worker_versioning.PinnedBuildIdSearchAttribute(tv.DeploymentVersionStringV32()), + "BuildIds search attribute mismatch: query=%q execution=%v search_attr=%v", + query, w, searchAttr) } } if len(usedBuilds) > 0 { // Validate TemporalUsedWorkerDeploymentVersions search attribute versionPayload, ok := w.GetSearchAttributes().GetIndexedFields()["TemporalUsedWorkerDeploymentVersions"] - s.True(ok) + s.True(ok, "TemporalUsedWorkerDeploymentVersions search attribute missing: query=%q execution=%v", query, w) versionAttrAny, err := sadefs.DecodeValue(versionPayload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, false) - s.NoError(err) + s.NoError(err, "failed to decode TemporalUsedWorkerDeploymentVersions search attribute: query=%q execution=%v", query, w) var versionAttr []string if versionAttrAny != nil { versionAttr = versionAttrAny.([]string) } for _, b := range usedBuilds { - s.Contains(versionAttr, b.DeploymentVersionStringV32()) + s.Contains(versionAttr, b.DeploymentVersionStringV32(), + "TemporalUsedWorkerDeploymentVersions mismatch: query=%q execution=%v version_attr=%v expected_used_build=%s", + query, w, versionAttr, b.DeploymentVersionStringV32()) } } fmt.Println(resp.GetExecutions()[0]) } - }, 5*time.Second, 50*time.Millisecond) + }, 30*time.Second, 500*time.Millisecond, + "verify versioning search attributes: namespace=%s workflow_id=%s behavior=%s execution_status=%s used_builds=%v", + env.Namespace(), tv.WorkflowID(), behavior, executionStatus, usedBuilds) } func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() { @@ -4547,9 +4668,7 @@ func (s *Versioning3Suite) TestAutoUpgradeWorkflows_NoBouncingBetweenVersions() s.NoError(err) // Verify that the workflow is running on v1 - s.Await(func(s *Versioning3Suite) { - s.verifyWorkflowVersioning(env, tv1, vbUnpinned, tv1.Deployment(), nil, nil) - }, 10*time.Second, 500*time.Millisecond) + s.verifyWorkflowVersioning(env, tv1, vbUnpinned, tv1.Deployment(), nil, nil) // Start v0 workers to ensure they never receive a task idlePollerCtx, idlePollerCancel := context.WithTimeout(s.Context(), 10*time.Second) @@ -4735,7 +4854,7 @@ func (s *Versioning3Suite) TestActivityTQLags_DependentActivityCompletesOnTheNew // Start an idle activity poller on v0. This poller should not receive any activity tasks //nolint:testifylint - go s.idlePollActivity(env, tv0, true, ver3MinPollTime, "activity should not go to the old deployment") + go s.idlePollActivity(s.Context(), env, tv0, true, ver3MinPollTime, "activity should not go to the old deployment") // Start a poller on v1 activityTaskCh := make(chan struct{}, 1) @@ -5370,7 +5489,7 @@ func (s *Versioning3Suite) TestCheckTaskQueueVersionMembership() { }) s.NoError(err) s.False(resp.GetIsMember()) // the check should pass if no version is present - }, 10*time.Second, 500*time.Millisecond) + }, 30*time.Second, 500*time.Millisecond) // Start v1 worker which shall register the version in the task queue w1 := worker.New(env.SdkClient(), tv1.TaskQueue().GetName(), worker.Options{ @@ -5392,23 +5511,13 @@ func (s *Versioning3Suite) TestCheckTaskQueueVersionMembership() { }) s.NoError(err) s.True(resp.GetIsMember()) - }, 10*time.Second, 500*time.Millisecond) + }, 30*time.Second, 500*time.Millisecond) } // validatePinnedVersionExistsInTaskQueue validates that the version, to be pinned, exists in the task queue. // TODO (future improvement): This can be further extended to validate the presence of any version instead of using the GetTaskQueueUserData RPC. func (s *Versioning3Suite) validatePinnedVersionExistsInTaskQueue(env *testcore.TestEnv, tv *testvars.TestVars) { - s.Await(func(s *Versioning3Suite) { - resp, err := env.GetTestCluster().MatchingClient().CheckTaskQueueVersionMembership(s.Context(), &matchingservice.CheckTaskQueueVersionMembershipRequest{ - NamespaceId: env.NamespaceID().String(), - TaskQueue: tv.TaskQueue().GetName(), - TaskQueueType: tqTypeWf, - Version: worker_versioning.DeploymentVersionFromDeployment(tv.Deployment()), - }) - s.NoError(err) - s.True(resp.GetIsMember()) - }, 10*time.Second, 500*time.Millisecond) - + s.waitForDeploymentVersionRegistration(env, tv, tqTypeWf) } // TestMaxVersionsInTaskQueue tests that polling from a task queue with too many @@ -5746,7 +5855,7 @@ func (s *Versioning3Suite) testTransitionDuringTransientTask(env *testcore.TestE } // Poll the second activity to cause transition to v1. - s.idlePollActivity(env, tv1, true, ver3MinPollTime, "should not get the activity because it started a transition") + s.idlePollActivity(s.Context(), env, tv1, true, ver3MinPollTime, "should not get the activity because it started a transition") s.verifyWorkflowVersioning(env, tv1, vbUnspecified, nil, nil, tv1.DeploymentVersionTransition()) // Print workflow describe and history @@ -6302,11 +6411,11 @@ func (s *Versioning3Suite) TestStalePartition_RevisionSuppressesTrampolining() { s.verifyWorkflowVersioning(env, tv1, vbPinned, tv1.Deployment(), nil, nil) // Register v2, set v2 as current (revision increments) - s.idlePollWorkflow(env, s.Context(), tv2, true, ver3MinPollTime, "v2 poller registration") + s.pollUntilRegistered(env, tv2) s.setCurrentDeployment(env, tv2) // Register v3, set v3 as current (revision increments again) - s.idlePollWorkflow(env, s.Context(), tv3, true, ver3MinPollTime, "v3 poller registration") + s.pollUntilRegistered(env, tv3) s.setCurrentDeployment(env, tv3) // Trigger WFT — target should be v3 with a high revision @@ -6379,7 +6488,7 @@ func (s *Versioning3Suite) TestStalePartition_RevisionSuppressesTrampolining() { // Set a new v4 as current — this produces a revision strictly higher than // the declined revision, simulating an up-to-date partition with fresh data. tv4 := tv1.WithBuildIDNumber(4) - s.idlePollWorkflow(env, s.Context(), tv4, true, ver3MinPollTime, "v4 poller registration") + s.pollUntilRegistered(env, tv4) s.setCurrentDeployment(env, tv4) s.waitForDeploymentDataPropagation(env, tv4, versionStatusCurrent, false, tqTypeWf) diff --git a/tools/testrunner/log.go b/tools/testrunner/log.go index 75fef7ddf1..77b9b85345 100644 --- a/tools/testrunner/log.go +++ b/tools/testrunner/log.go @@ -346,6 +346,9 @@ func parseFailedTestsFromOutput(stdout string) []string { func parseFailureDetails(data string) string { lines := normalizedFailureLines(data) + if block, ok := findLastAwaitFailureBlock(lines); ok { + return block + } // Prefer assertion blocks because they contain the useful testify failure // detail and can be selected from the end while ignoring trailing logs. if block, ok := findLastAssertionFailureBlock(lines); ok { @@ -371,6 +374,27 @@ func normalizedFailureLines(data string) []string { return lines } +func findLastAwaitFailureBlock(lines []string) (string, bool) { + for start := len(lines) - 1; start >= 0; start-- { + if !strings.Contains(lines[start], "await stats:") { + continue + } + end := len(lines) + for i := start + 1; i < len(lines); i++ { + if strings.HasPrefix(strings.TrimSpace(lines[i]), goTestFailLinePrefix) { + end = i + 1 + break + } + if strings.TrimSpace(lines[i]) == "FAIL" { + end = i + break + } + } + return strings.Join(lines[start:end], "\n"), true + } + return "", false +} + func findLastAssertionFailureBlock(lines []string) (string, bool) { var failLine string for i := len(lines) - 1; i >= 0; i-- { diff --git a/tools/testrunner/log_test.go b/tools/testrunner/log_test.go index a3ca35f4de..1b0e6d856e 100644 --- a/tools/testrunner/log_test.go +++ b/tools/testrunner/log_test.go @@ -198,6 +198,53 @@ FAIL`, }, notContains: []string{"attempts omitted", "--- attempt 1 ---", "first failure", "--- attempt 2 ---", "penultimate failure", "logger.go", "connection refused"}, }, + { + name: "keeps full await timeout diagnostics", + data: ` require_ctx.go:243: + Error Trace: require_ctx.go:243 + Error: await stats: polls=3 failed_attempts=3 stopped_attempts=3 deadlock_attempts=0 attempt_duration min=1ms avg=2ms max=3ms last=3ms sleep_duration min=100ms avg=100ms max=100ms last=100ms + slowest attempts: #3=3ms #2=2ms #1=1ms + context at timeout: parent_err= await_err=context deadline exceeded deadline_remaining=-1ms + + last failed attempt before timeout: + + --- attempt 3 --- + Error Trace: versioning_3_test.go:10 + Error: context deadline exceeded + + last non-deadline failed attempt: + + --- attempt 1 --- + Error Trace: versioning_3_test.go:10 + Error: Worker Deployment Version not found + + attempt errors: + + --- attempt 1 --- + Error Trace: versioning_3_test.go:10 + Error: Worker Deployment Version not found + + --- attempt 2 --- + Error Trace: versioning_3_test.go:10 + Error: context deadline exceeded + + --- attempt 3 --- + Error Trace: versioning_3_test.go:10 + Error: context deadline exceeded + + Requiref: deployment not ready (not satisfied after 90s, 3 polls) +--- FAIL: TestSuite/TestCase (90.00s) +FAIL`, + contains: []string{ + "await stats: polls=3", + "last failed attempt before timeout:", + "last non-deadline failed attempt:", + "Worker Deployment Version not found", + "attempt errors:", + "Requiref: deployment not ready", + "--- FAIL: TestSuite/TestCase (90.00s)", + }, + }, } for _, tt := range tests {