HYPERFLEET-583 - feat: add dead man's switch metrics for silent failure detection#61
HYPERFLEET-583 - feat: add dead man's switch metrics for silent failure detection#61xueli181114 wants to merge 1 commit intoopenshift-hyperfleet:mainfrom
Conversation
…re detection Add three Prometheus gauge metrics to detect silent failures where the adapter is running but not processing messages: - hyperfleet_adapter_last_message_processed_timestamp (dead man's switch) - hyperfleet_adapter_last_message_success_timestamp - hyperfleet_adapter_last_message_failure_timestamp Metrics are recorded via an onResult callback on CreateHandler, keeping the executor free of metrics concerns. The MetricsServer now uses a dedicated Prometheus registry with Go/process collectors. Made-with: Cursor
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
WalkthroughThis PR introduces metrics instrumentation for message processing by implementing a post-processing callback mechanism in the event handler. The executor's CreateHandler method is updated to accept optional callbacks that fire after each event execution completes. A new metrics infrastructure is added to the health package with per-server Prometheus registries and three new timestamp gauges (last processed, last success, last failure) to track message processing milestones. The main entry point is modified to supply a callback that records these metrics. Supporting tests are added to verify callback invocation, metrics recording behavior, and the integrated lifecycle. Sequence Diagram(s)sequenceDiagram
participant Adapter as Adapter Main
participant Handler as Event Handler
participant Executor as Executor
participant Callback as Metrics Callback
participant Metrics as MetricsServer
Adapter->>Handler: Event arrives
Handler->>Executor: Execute event
Executor-->>Handler: ExecutionResult
Handler->>Callback: onResult(ExecutionResult)
Callback->>Metrics: RecordMessageProcessed()
alt Execution Success
Callback->>Metrics: RecordMessageSuccess()
else Execution Failed
Callback->>Metrics: RecordMessageFailure()
end
Metrics-->>Callback: Timestamps updated
Callback-->>Handler: Callback completes
Handler-->>Adapter: ACK event
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
pkg/health/metrics_test.go (1)
157-183: Potential port conflict in parallel test execution.The lifecycle test binds to a hardcoded port (
19090). If tests run in parallel (go test -parallel), this could cause flaky failures.Consider using port
0and extracting the assigned port, or uset.Parallel()guards appropriately.💡 Suggested improvement using dynamic port
func TestMetricsServer_Lifecycle(t *testing.T) { - port := "19090" + port := "0" // Let OS assign available port ms := NewMetricsServer(&mockLogger{}, port, MetricsConfig{ Component: "lifecycle-test", Version: "v0.0.1", Commit: "def456", }) ctx := context.Background() err := ms.Start(ctx) require.NoError(t, err) time.Sleep(50 * time.Millisecond) ms.RecordMessageProcessed() ms.RecordMessageSuccess() - resp, err := http.Get("http://localhost:" + port + "/metrics") + // Extract actual port from server.Addr after Start() + addr := ms.server.Addr + resp, err := http.Get("http://localhost" + addr + "/metrics")Note: This would require exposing the actual bound address from the server, which may need additional implementation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/health/metrics_test.go` around lines 157 - 183, The test TestMetricsServer_Lifecycle uses a hardcoded port ("19090") which can conflict in parallel runs; change the test to request an ephemeral port by passing "0" (or empty) to NewMetricsServer and update the MetricsServer implementation (NewMetricsServer/Start) to listen on port 0 and expose the actual bound address/port (e.g., add a method BoundAddr() or make Start return the listen address) so the test can fetch the real /metrics URL (http://localhost:<boundPort>/metrics); update the test to read that bound port from ms.BoundAddr() (or Start's result) before issuing http.Get, ensuring no port collisions in parallel tests.internal/executor/executor.go (1)
239-274: LGTM! Clean callback pattern for post-processing hooks.The variadic
onResultcallback design is well-suited for the metrics use case. The documentation clearly states that callbacks cannot influence the handler's return value.One consideration: callbacks are invoked synchronously, so a slow or panicking callback could affect message processing. For the current metrics use case (lightweight gauge updates), this is acceptable. If heavier callbacks are added in the future, consider:
- Adding panic recovery around the callback loop
- Documenting the expectation that callbacks should be fast and non-blocking
💡 Optional: Add panic recovery for robustness
result := e.Execute(ctx, evt.Data()) for _, fn := range onResult { + func() { + defer func() { + if r := recover(); r != nil { + e.log.Errorf(ctx, "onResult callback panicked: %v", r) + } + }() fn(result) + }() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/executor/executor.go` around lines 239 - 274, The onResult callback loop in Executor.CreateHandler can crash the handler if a callback panics; wrap each invocation of the callbacks (the for _, fn := range onResult { fn(result) } loop) with panic recovery so a panicking callback is recovered and logged via e.log (e.g., e.log.Errorf(ctx, ...)) and processing continues; ensure recovery is per-callback (so one bad callback doesn't skip others) and keep the callbacks invoked synchronously as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/executor/executor.go`:
- Around line 239-274: The onResult callback loop in Executor.CreateHandler can
crash the handler if a callback panics; wrap each invocation of the callbacks
(the for _, fn := range onResult { fn(result) } loop) with panic recovery so a
panicking callback is recovered and logged via e.log (e.g., e.log.Errorf(ctx,
...)) and processing continues; ensure recovery is per-callback (so one bad
callback doesn't skip others) and keep the callbacks invoked synchronously as
before.
In `@pkg/health/metrics_test.go`:
- Around line 157-183: The test TestMetricsServer_Lifecycle uses a hardcoded
port ("19090") which can conflict in parallel runs; change the test to request
an ephemeral port by passing "0" (or empty) to NewMetricsServer and update the
MetricsServer implementation (NewMetricsServer/Start) to listen on port 0 and
expose the actual bound address/port (e.g., add a method BoundAddr() or make
Start return the listen address) so the test can fetch the real /metrics URL
(http://localhost:<boundPort>/metrics); update the test to read that bound port
from ms.BoundAddr() (or Start's result) before issuing http.Get, ensuring no
port collisions in parallel tests.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
cmd/adapter/main.gointernal/executor/executor.gointernal/executor/executor_test.gopkg/health/metrics.gopkg/health/metrics_test.go
Summary
hyperfleet_adapter_last_message_processed_timestamp— dead man's switch, updated on every processed CloudEventhyperfleet_adapter_last_message_success_timestamp— updated only on successful executionhyperfleet_adapter_last_message_failure_timestamp— updated only on failed executionCreateHandlerwith an optionalonResultcallback so the caller can observe theExecutionResultwithout coupling the executor to metricsMetricsServerto use a dedicated Prometheus registry with Go/process collectorsTest plan
/metricsendpoint exposes all metrics including Go/process collectorsCreateHandlercallback: receivesStatusSuccesson valid event,StatusFailedon invalid event, no-op when no callback providedgo build,go vet,golangci-lintall cleanJira: https://issues.redhat.com/browse/HYPERFLEET-583
Made with Cursor
Summary by CodeRabbit
New Features
Tests