feat: add replay data loading, partitioning, and request building#233
feat: add replay data loading, partitioning, and request building#233JasonXuDeveloper wants to merge 3 commits intoAzure:unstable-replayfrom
Conversation
4210d7b to
f000c83
Compare
| } | ||
|
|
||
| baseURL = strings.TrimSuffix(baseURL, "/") | ||
| fullURL := baseURL + apiPath |
There was a problem hiding this comment.
use https://pkg.go.dev/net/url#JoinPath instead?
| avg := float64(total) / float64(runnerCount) | ||
|
|
||
| var maxCount, minCount int | ||
| if len(counts) > 0 { |
There was a problem hiding this comment.
no buitin min max func?
There was a problem hiding this comment.
Pull request overview
This PR introduces the core components for the replay feature in kperf, enabling users to replay captured API request traffic against a Kubernetes cluster. The PR is part 4 of a 6-part feature stack and builds upon the shared metrics utility refactoring from PR #232.
Changes:
- Adds replay data types (
ReplayRequest,ReplayProfile,ReplayProfileSpec) with comprehensive validation - Implements profile loading from files, URLs, or PVC mounts with automatic gzip decompression
- Provides request partitioning logic using FNV-1a hashing to distribute load across runners while preserving per-object operation ordering
- Implements HTTP request building and execution using Kubernetes rest.Interface
- Refactors metrics reporting into a shared utility function to enable code reuse
- Updates RunnerGroupSpec to support replay mode configuration
- Includes sample configuration files and comprehensive test coverage
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| api/types/replay.go | New replay data types with validation logic |
| api/types/replay_test.go | Comprehensive tests for replay types including validation, ObjectKey, and Duration methods |
| api/types/runner_group.go | Adds ReplayProfile and ReplayPVCName fields, IsReplayMode helper method |
| api/types/load_traffic.go | Typo fix: "traget" → "target" |
| replay/loader.go | Profile loading from files/URLs with gzip auto-detection |
| replay/loader_test.go | Tests for profile parsing, gzip handling, and error cases |
| replay/partition.go | Request partitioning using consistent hashing with distribution analysis |
| replay/partition_test.go | Tests for partitioning logic, ordering preservation, and edge cases |
| replay/builder.go | HTTP request building with URL masking for metrics aggregation |
| replay/builder_test.go | Tests for verb mapping, URL building, query parameters, and special cases |
| metrics/utils.go | Extracted shared BuildPercentileLatenciesReport utility function |
| cmd/kperf/commands/runner/runner.go | Typo fix ("letencies" → "latencies"), adds specs validation, refactors to use shared metrics utility |
| cmd/kperf/commands/runnergroup/run.go | Only overrides nodeAffinity if CLI flag is provided |
| testdata/sample_replay.yaml | Sample replay profile demonstrating Pod lifecycle operations |
| testdata/sample_replay_runnergroup.yaml | Sample RunnerGroupSpec for replay mode |
| func CountByRunner(requests []types.ReplayRequest, runnerCount int) []int { | ||
| counts := make([]int, runnerCount) | ||
| for _, req := range requests { | ||
| idx := assignRunner(req.ObjectKey(), runnerCount) | ||
| counts[idx]++ | ||
| } | ||
| return counts | ||
| } |
There was a problem hiding this comment.
The CountByRunner function doesn't validate its runnerCount parameter. If runnerCount is 0 or negative, calling assignRunner will cause a divide-by-zero panic at line 40 when computing the modulo operation.
Consider adding validation similar to PartitionRequests (lines 16-21) to return nil or an empty slice for invalid runnerCount values.
| // AnalyzeDistribution returns statistics about request distribution across runners. | ||
| func AnalyzeDistribution(requests []types.ReplayRequest, runnerCount int) map[string]interface{} { | ||
| counts := CountByRunner(requests, runnerCount) | ||
|
|
||
| total := len(requests) | ||
| avg := float64(total) / float64(runnerCount) | ||
|
|
||
| var maxCount, minCount int | ||
| if len(counts) > 0 { | ||
| maxCount = counts[0] | ||
| minCount = counts[0] | ||
| } | ||
|
|
||
| for _, c := range counts { | ||
| if c > maxCount { | ||
| maxCount = c | ||
| } | ||
| if c < minCount { | ||
| minCount = c | ||
| } | ||
| } | ||
|
|
||
| imbalance := 0.0 | ||
| if avg > 0 { | ||
| imbalance = float64(maxCount-minCount) / avg * 100 // % imbalance | ||
| } | ||
|
|
||
| return map[string]interface{}{ | ||
| "total": total, | ||
| "runnerCount": runnerCount, | ||
| "average": avg, | ||
| "min": minCount, | ||
| "max": maxCount, | ||
| "imbalance": imbalance, | ||
| "perRunner": counts, | ||
| } | ||
| } |
There was a problem hiding this comment.
The AnalyzeDistribution function doesn't validate its runnerCount parameter. If runnerCount is 0, line 59 will perform a division by zero, resulting in an Inf value for avg. Additionally, if runnerCount is negative, CountByRunner (line 56) will panic.
Consider adding validation to handle edge cases gracefully, such as returning an error map or validating inputs at the start of the function.
| // AnalyzeDistribution returns statistics about request distribution across runners. | ||
| func AnalyzeDistribution(requests []types.ReplayRequest, runnerCount int) map[string]interface{} { | ||
| counts := CountByRunner(requests, runnerCount) | ||
|
|
||
| total := len(requests) | ||
| avg := float64(total) / float64(runnerCount) | ||
|
|
||
| var maxCount, minCount int | ||
| if len(counts) > 0 { | ||
| maxCount = counts[0] | ||
| minCount = counts[0] | ||
| } | ||
|
|
||
| for _, c := range counts { | ||
| if c > maxCount { | ||
| maxCount = c | ||
| } | ||
| if c < minCount { | ||
| minCount = c | ||
| } | ||
| } | ||
|
|
||
| imbalance := 0.0 | ||
| if avg > 0 { | ||
| imbalance = float64(maxCount-minCount) / avg * 100 // % imbalance | ||
| } | ||
|
|
||
| return map[string]interface{}{ | ||
| "total": total, | ||
| "runnerCount": runnerCount, | ||
| "average": avg, | ||
| "min": minCount, | ||
| "max": maxCount, | ||
| "imbalance": imbalance, | ||
| "perRunner": counts, | ||
| } | ||
| } |
There was a problem hiding this comment.
The AnalyzeDistribution function is exported but has no test coverage. This function has several edge cases (zero/negative runnerCount, empty requests) that should be tested to ensure it behaves correctly.
Consider adding test coverage for this function, including tests for edge cases like empty request lists and invalid runnerCount values.
| // fetchFromURL fetches content from an HTTP/HTTPS URL. | ||
| func fetchFromURL(ctx context.Context, url string) (io.ReadCloser, error) { | ||
| req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create request: %w", err) | ||
| } | ||
|
|
||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to fetch URL: %w", err) | ||
| } | ||
|
|
||
| if resp.StatusCode != http.StatusOK { | ||
| resp.Body.Close() | ||
| return nil, fmt.Errorf("HTTP error: %s", resp.Status) | ||
| } | ||
|
|
||
| return resp.Body, nil | ||
| } |
There was a problem hiding this comment.
Using http.DefaultClient without a configured timeout can cause requests to hang indefinitely if the server is unresponsive. While the context is passed to the request, the DefaultClient's underlying transport may not properly honor context cancellation in all scenarios (e.g., during connection establishment).
Consider creating a custom http.Client with an appropriate timeout or using the context to enforce a deadline. For example:
- Set a reasonable default timeout on the Client (e.g., 30-60 seconds)
- Or add a timeout parameter that can be configured
This is particularly important for production use cases where network issues could cause long hangs.
| type RunnerGroupSpec struct { | ||
| // Count is the number of runners. | ||
| Count int32 `json:"count" yaml:"count"` | ||
| // Profile defines what the load traffic looks like. | ||
| Profile *LoadProfile `json:"loadProfile,omitempty" yaml:"loadProfile"` | ||
| // ReplayProfile defines the source for replay mode profile (URL or PVC path). | ||
| // When set, runners execute in replay mode instead of load profile mode. | ||
| // Supports: http(s):// URLs or local paths (e.g., /mnt/profile/replay.yaml.gz) | ||
| ReplayProfile *string `json:"replayProfile,omitempty" yaml:"replayProfile,omitempty"` | ||
| // NodeAffinity defines how to deploy runners into dedicated nodes | ||
| // which have specific labels. | ||
| NodeAffinity map[string][]string `json:"nodeAffinity,omitempty" yaml:"nodeAffinity,omitempty"` | ||
| // ServiceAccount is the name of the ServiceAccount to use to run runners. | ||
| ServiceAccount *string `json:"serviceAccount,omitempty" yaml:"serviceAccount,omitempty"` | ||
| // OwnerReference is to mark the runner group depending on this object. | ||
| // | ||
| // FORMAT: APIVersion:Kind:Name:UID | ||
| OwnerReference *string `json:"ownerReference,omitempty" yaml:"ownerReference,omitempty"` | ||
| // ReplayPVCName is the name of the PVC to mount for replay profile. | ||
| // Only used when ReplayProfile is a local path (not URL). | ||
| ReplayPVCName *string `json:"replayPVCName,omitempty" yaml:"replayPVCName,omitempty"` | ||
| } |
There was a problem hiding this comment.
The RunnerGroupSpec allows both Profile and ReplayProfile to be set simultaneously, creating an ambiguous configuration. There's no validation to ensure that only one mode is active.
Consider adding a Validate() method for RunnerGroupSpec that ensures:
- At least one of Profile or ReplayProfile is set
- Both are not set at the same time
- If ReplayProfile is a local path (not http/https), ReplayPVCName must be set
This would prevent configuration errors and provide clear feedback to users.
| func (r ReplayRequest) ObjectKey() string { | ||
| if r.Namespace == "" { | ||
| return fmt.Sprintf("/%s/%s", r.ResourceKind, r.Name) | ||
| } | ||
| return fmt.Sprintf("%s/%s/%s", r.Namespace, r.ResourceKind, r.Name) | ||
| } |
There was a problem hiding this comment.
The ObjectKey implementation has an issue with LIST/WATCH operations. For these operations, the Name field is empty (as specified in line 18 comments and enforced by validation), which means ObjectKey() will return keys like "default/Pod/" with a trailing empty name segment.
This causes all LIST/WATCH requests for the same namespace and resource kind to hash to identical keys, routing them all to the same runner. This defeats the purpose of load distribution for LIST/WATCH operations.
Consider including additional distinguishing information in the key for operations without a name, such as the labelSelector or a unique identifier, or handle LIST/WATCH operations specially in the partitioning logic to achieve better distribution.
Add foundation types for the timeseries replay system: - ReplayRequest, ReplayProfile, ReplayProfileSpec types with validation - IsReplayMode() method on RunnerGroupSpec for detecting replay configs - ReplayProfileSpec field in RunnerGroupSpec for distributed mode - Sample replay profile and runner group config test data Signed-off-by: JasonXuDeveloper - 傑 <jason@xgamedev.net>
Extract shared report-building logic into metrics.BuildPercentileLatenciesReport() to avoid duplication between runner and replay report builders. Refactor buildRunnerMetricReport() to use the shared utility. Signed-off-by: JasonXuDeveloper - 傑 <jason@xgamedev.net>
Core replay data processing (no execution engine yet): - Loader: YAML/gzip profile loading from file or URL - Partition: request distribution across runners using object-key consistent hashing to preserve per-object ordering - Builder: HTTP request construction with URL building, verb mapping, and URL masking for metrics aggregation Signed-off-by: JasonXuDeveloper - 傑 <jason@xgamedev.net>
f000c83 to
1b6a5a4
Compare
Summary
Test plan
go build ./...passesgo vet ./...passesgo test ./replay/...passes (loader, partition, builder tests)