From d7a182524e4834ed63d05db138c066f37ac0fbe9 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Fri, 27 Mar 2026 10:48:06 -0400 Subject: [PATCH 01/10] feat: Add internal event processing OTEL metrics Establish an EventMetrics interface pattern for reporting internal event processing telemetry via OpenTelemetry. Each subsystem defines its own interface; the metrics package provides an OTEL-backed implementation (EventMetricsRecorder) that satisfies both the ld-relay and go-sdk-events interfaces via Go structural typing. New metrics under the events.received namespace: - events.received.bytes: bytes of event data received from SDKs (renamed from events.ingested.bytes) New metrics under the events.sent namespace: - events.sent.count: events successfully delivered to LaunchDarkly - events.sent.bytes: payload bytes delivered (pre-compression) - events.sent.failures: events in batches that failed after retries - events.sent.dropped: events discarded due to capacity overflow - events.sent.pending: current number of events buffered (gauge) All metrics are recorded from both the verbatim relay (HTTPEventPublisher) and the summarizing relay (go-sdk-events DefaultEventProcessor) paths. A temporary go.mod replace directive points to the local go-sdk-events worktree for development. --- go.mod | 20 +-- go.sum | 41 ----- internal/events/event-relay.go | 20 ++- internal/events/event-relay_test.go | 1 + internal/events/event_metrics.go | 39 +++++ internal/events/event_publisher.go | 48 +++++- internal/events/event_publisher_test.go | 142 ++++++++++++++++++ internal/events/summarizing-relay.go | 2 + internal/metrics/constants.go | 8 +- internal/metrics/measures.go | 93 +++++++++++- internal/metrics/metrics.go | 26 +++- internal/metrics/metrics_test.go | 2 +- internal/metrics/test_utils_test.go | 2 +- .../middleware/metrics_middleware_test.go | 4 +- internal/relayenv/env_context_impl.go | 53 ++++--- 15 files changed, 407 insertions(+), 94 deletions(-) create mode 100644 internal/events/event_metrics.go diff --git a/go.mod b/go.mod index d1acff31..06edf810 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/launchdarkly/ld-relay/v8 go 1.25.0 +replace github.com/launchdarkly/go-sdk-events/v3 => /home/mkeeler/code/launchdarkly/go-sdk-events.git/enhanced-telemetry + require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/aws/aws-sdk-go-v2 v1.24.1 @@ -33,7 +35,6 @@ require ( github.com/launchdarkly/go-server-sdk/v7 v7.14.3 github.com/launchdarkly/go-test-helpers/v3 v3.1.0 github.com/pborman/uuid v1.2.1 - github.com/prometheus/client_golang v1.23.2 // override to address CVE-2022-21698 github.com/stretchr/testify v1.11.1 golang.org/x/sync v0.19.0 gopkg.in/gcfg.v1 v1.2.3 @@ -58,21 +59,16 @@ require ( github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b github.com/klauspost/compress v1.18.0 github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e + go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0 go.opentelemetry.io/otel v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 - go.opentelemetry.io/otel/exporters/prometheus v0.62.0 go.opentelemetry.io/otel/metric v1.42.0 go.opentelemetry.io/otel/sdk v1.42.0 go.opentelemetry.io/otel/sdk/metric v1.42.0 - go.opentelemetry.io/otel/trace v1.42.0 - google.golang.org/grpc v1.78.0 ) require ( - github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -99,19 +95,12 @@ require ( github.com/miekg/dns v1.1.58 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/gomega v1.27.10 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.67.5 // indirect - github.com/prometheus/otlptranslator v1.0.0 // indirect - github.com/prometheus/procfs v0.19.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect @@ -119,6 +108,7 @@ require ( golang.org/x/text v0.33.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/grpc v1.78.0 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 01ee6ba6..e2262dd8 100644 --- a/go.sum +++ b/go.sum @@ -78,7 +78,6 @@ github.com/aws/smithy-go v1.19.0 h1:KWFKQV80DpP3vJrrA9sVAHQ5gc2z8i4EzrLhLlWXcBM= github.com/aws/smithy-go v1.19.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= @@ -265,8 +264,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= -github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e h1:PZ8SXmC5B/jTc8FfrWfSGNjy0ieGwqcKPjPV4vMtUqM= github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e/go.mod h1:cQRkOAs0LGcfIs6RSsHNqwhzItUZooyhpqPv0hgiQZM= github.com/launchdarkly/ccache v1.1.0 h1:voD1M+ZJXR3MREOKtBwgTF9hYHl1jg+vFKS/+VAkR2k= @@ -283,8 +280,6 @@ github.com/launchdarkly/go-ntlmssp v1.0.3 h1:rFxOnnEJ2DzJ+NU0plhXqnldJUwn3wWJFTW github.com/launchdarkly/go-ntlmssp v1.0.3/go.mod h1:P1z6fX/y9zgBvfnZP7AKWilW9AX5M3czsa1S4Zpp2nM= github.com/launchdarkly/go-sdk-common/v3 v3.4.0 h1:GTRulE0G43xdWY1QdjAXJ7QnZ8PMFU8pOWZICCydEtM= github.com/launchdarkly/go-sdk-common/v3 v3.4.0/go.mod h1:6MNeeP8b2VtsM6I3TbShCHW/+tYh2c+p5dB+ilS69sg= -github.com/launchdarkly/go-sdk-events/v3 v3.5.0 h1:Yav8Thm70dZbO8U1foYwZPf3w60n/lNBRaYeeNM/qg4= -github.com/launchdarkly/go-sdk-events/v3 v3.5.0/go.mod h1:oepYWQ2RvvjfL2WxkE1uJJIuRsIMOP4WIVgUpXRPcNI= github.com/launchdarkly/go-semver v1.0.3 h1:agIy/RN3SqeQDIfKkl+oFslEdeIs7pgsJBs3CdCcGQM= github.com/launchdarkly/go-semver v1.0.3/go.mod h1:xFmMwXba5Mb+3h72Z+VeSs9ahCvKo2QFUTHRNHVqR28= github.com/launchdarkly/go-server-sdk-consul/v3 v3.0.0 h1:AXmmU4rsMxdA75o4a9p+7Pl3SzdfUCLIw7CM7pBRifE= @@ -333,8 +328,6 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -361,25 +354,15 @@ github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSg github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= -github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= -github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= -github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= -github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= -github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= -github.com/prometheus/otlptranslator v1.0.0 h1:s0LJW/iN9dkIH+EnhiD3BlkkP5QVIUVEoIwkU+A6qos= -github.com/prometheus/otlptranslator v1.0.0/go.mod h1:vRYWnXvI6aWGpsdY/mOT/cbeVRBlPWtBNDb7kGR3uKM= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= -github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= -github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= @@ -421,44 +404,22 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0 h1:fM78cKITJ2r08cl+nw5i+hI9zWAu3iak8o1Os/ca2Ck= go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0/go.mod h1:ybmlzIqGcQzwt5lAfi8TpSnHo/CI3yv1Czodmm+OJa8= -go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= -go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 h1:9y5sHvAxWzft1WQ4BwqcvA+IFVUJ1Ya75mSAUnFEVwE= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0/go.mod h1:eQqT90eR3X5Dbs1g9YSM30RavwLF725Ris5/XSXWvqE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0/go.mod h1:EtekO9DEJb4/jRyN4v4Qjc2yA7AtfCBuz2FynRUWTXs= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 h1:wVZXIWjQSeSmMoxF74LzAnpVQOAFDo3pPji9Y4SOFKc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0/go.mod h1:khvBS2IggMFNwZK/6lEeHg/W57h/IX6J4URh57fuI40= -go.opentelemetry.io/otel/exporters/prometheus v0.62.0 h1:krvC4JMfIOVdEuNPTtQ0ZjCiXrybhv+uOHMfHRmnvVo= -go.opentelemetry.io/otel/exporters/prometheus v0.62.0/go.mod h1:fgOE6FM/swEnsVQCqCnbOfRV4tOnWPg7bVeo4izBuhQ= -go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= -go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= -go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= -go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= -go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= -go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc= -go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= -go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= -go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -597,8 +558,6 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/events/event-relay.go b/internal/events/event-relay.go index d7f2506d..361a0d01 100644 --- a/internal/events/event-relay.go +++ b/internal/events/event-relay.go @@ -45,6 +45,7 @@ type analyticsEventEndpointDispatcher struct { summarizingRelay *eventSummarizingRelay wrapper *datadestination.DataDestinationWrapper eventQueueCleanupInterval time.Duration + eventMetrics EventMetrics loggers ldlog.Loggers mu sync.Mutex } @@ -170,7 +171,11 @@ func (r *analyticsEventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRela r.mu.Lock() defer r.mu.Unlock() if r.verbatimRelay == nil { - r.verbatimRelay = newEventVerbatimRelay(r.authKey, r.config, r.httpConfig, r.loggers, r.remotePath) + var extraOpts []OptionType + if r.eventMetrics != nil { + extraOpts = append(extraOpts, OptionEventMetrics{EventMetrics: r.eventMetrics}) + } + r.verbatimRelay = newEventVerbatimRelay(r.authKey, r.config, r.httpConfig, r.loggers, r.remotePath, extraOpts...) } return r.verbatimRelay } @@ -180,7 +185,7 @@ func (r *analyticsEventEndpointDispatcher) getSummarizingRelay() *eventSummarizi defer r.mu.Unlock() if r.summarizingRelay == nil { r.summarizingRelay = newEventSummarizingRelay(r.config, r.httpConfig, r.authKey, r.wrapper, - r.loggers, r.remotePath, r.eventQueueCleanupInterval) + r.loggers, r.remotePath, r.eventQueueCleanupInterval, r.eventMetrics) } return r.summarizingRelay } @@ -206,11 +211,12 @@ func NewEventDispatcher( httpConfig httpconfig.HTTPConfig, wrapper *datadestination.DataDestinationWrapper, eventQueueCleanupInterval time.Duration, // normally zero to use the default; overridden in tests + eventMetrics EventMetrics, ) *EventDispatcher { ep := &EventDispatcher{ analyticsEndpoints: map[basictypes.SDKKind]*analyticsEventEndpointDispatcher{ basictypes.ServerSDK: newAnalyticsEventEndpointDispatcher(sdkKey, - config, httpConfig, wrapper, loggers, "/bulk", eventQueueCleanupInterval), + config, httpConfig, wrapper, loggers, "/bulk", eventQueueCleanupInterval, eventMetrics), }, diagnosticEndpoints: map[basictypes.SDKKind]*diagnosticEventEndpointDispatcher{ basictypes.ServerSDK: newDiagnosticEventEndpointDispatcher(config, httpConfig, loggers, "/diagnostic"), @@ -218,12 +224,12 @@ func NewEventDispatcher( } if mobileKey.Defined() { ep.analyticsEndpoints[basictypes.MobileSDK] = newAnalyticsEventEndpointDispatcher(mobileKey, - config, httpConfig, wrapper, loggers, "/mobile", eventQueueCleanupInterval) + config, httpConfig, wrapper, loggers, "/mobile", eventQueueCleanupInterval, eventMetrics) ep.diagnosticEndpoints[basictypes.MobileSDK] = newDiagnosticEventEndpointDispatcher(config, httpConfig, loggers, "/mobile/events/diagnostic") } if envID.Defined() { ep.analyticsEndpoints[basictypes.JSClientSDK] = newAnalyticsEventEndpointDispatcher(envID, config, httpConfig, wrapper, loggers, - "/events/bulk/"+string(envID), eventQueueCleanupInterval) + "/events/bulk/"+string(envID), eventQueueCleanupInterval, eventMetrics) ep.diagnosticEndpoints[basictypes.JSClientSDK] = newDiagnosticEventEndpointDispatcher(config, httpConfig, loggers, "/events/diagnostic/"+string(envID)) } @@ -280,6 +286,7 @@ func newAnalyticsEventEndpointDispatcher( loggers ldlog.Loggers, remotePath string, eventQueueCleanupInterval time.Duration, + eventMetrics EventMetrics, ) *analyticsEventEndpointDispatcher { return &analyticsEventEndpointDispatcher{ authKey: authKey, @@ -290,6 +297,7 @@ func newAnalyticsEventEndpointDispatcher( loggers: loggers, remotePath: remotePath, eventQueueCleanupInterval: eventQueueCleanupInterval, + eventMetrics: eventMetrics, } } @@ -299,6 +307,7 @@ func newEventVerbatimRelay( httpConfig httpconfig.HTTPConfig, loggers ldlog.Loggers, remotePath string, + extraOptions ...OptionType, ) *eventVerbatimRelay { eventsURI := getEventsURI(config) opts := []OptionType{ @@ -307,6 +316,7 @@ func newEventVerbatimRelay( OptionURIPath(remotePath), OptionFlushInterval(config.FlushInterval.GetOrElse(c.DefaultEventsFlushInterval)), } + opts = append(opts, extraOptions...) publisher, _ := NewHTTPEventPublisher(authKey, httpConfig, loggers, opts...) diff --git a/internal/events/event-relay_test.go b/internal/events/event-relay_test.go index 5a6687f9..a1e2ded3 100644 --- a/internal/events/event-relay_test.go +++ b/internal/events/event-relay_test.go @@ -122,6 +122,7 @@ func eventRelayTestWithOptions( httpConfig, wrapper, opts.eventQueueCleanupInterval, + nil, ) defer dispatcher.Close() diff --git a/internal/events/event_metrics.go b/internal/events/event_metrics.go new file mode 100644 index 00000000..e07ae96f --- /dev/null +++ b/internal/events/event_metrics.go @@ -0,0 +1,39 @@ +package events + +import ( + ldevents "github.com/launchdarkly/go-sdk-events/v3" +) + +// EventSendFailureMetadata is an alias for ldevents.EventSendFailureMetadata so that the +// ld-relay EventMetrics interface is structurally identical to the go-sdk-events one. +// This allows a single concrete type to satisfy both interfaces. +type EventSendFailureMetadata = ldevents.EventSendFailureMetadata + +// EventMetrics defines an interface for receiving metrics about event processing. This is used by +// the metrics package to record telemetry (e.g. via OpenTelemetry) about events that are dropped, +// sent, or otherwise processed by the relay's event forwarding pipeline. +// +// This interface mirrors the one in go-sdk-events. The metrics package provides a single concrete +// implementation that satisfies both via Go structural typing. +type EventMetrics interface { + // RecordDroppedEvents is called when events are discarded because the event buffer has reached + // its configured capacity. The count parameter indicates how many events were dropped. + RecordDroppedEvents(count int) + + // RecordEventsSent is called when a batch of events has been successfully delivered to the + // events service. The count parameter indicates how many events were in the batch. + RecordEventsSent(count int) + + // RecordEventsFailedSend is called when a batch of events could not be delivered to the + // events service after all retry attempts. The count parameter indicates how many events + // were in the failed batch. The metadata parameter provides additional context about the failure. + RecordEventsFailedSend(count int, metadata EventSendFailureMetadata) + + // RecordEventsBytesSent is called when a batch of events has been successfully delivered. + // The bytes parameter is the size of the serialized event payload before compression. + RecordEventsBytesSent(bytes int) + + // RecordPendingEvents is called after any operation that changes the number of events + // buffered awaiting delivery. The count parameter is the current total number of events pending. + RecordPendingEvents(count int) +} diff --git a/internal/events/event_publisher.go b/internal/events/event_publisher.go index fd373ced..6ed78a19 100644 --- a/internal/events/event_publisher.go +++ b/internal/events/event_publisher.go @@ -103,10 +103,11 @@ type HTTPEventPublisher struct { disableQueue chan interface{} disabled bool - queues map[EventPayloadMetadata]*publisherQueue - capacity int - overflowed bool - lock sync.RWMutex + queues map[EventPayloadMetadata]*publisherQueue + capacity int + overflowed bool + eventMetrics EventMetrics + lock sync.RWMutex } type eventBatch struct { @@ -159,6 +160,17 @@ func (o OptionCapacity) apply(p *HTTPEventPublisher) error { return nil } +// OptionEventMetrics provides an EventMetrics implementation for recording metrics about event +// processing such as dropped event counts. +type OptionEventMetrics struct { + EventMetrics EventMetrics +} + +func (o OptionEventMetrics) apply(p *HTTPEventPublisher) error { + p.eventMetrics = o.EventMetrics + return nil +} + // NewHTTPEventPublisher creates a new HTTPEventPublisher. func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconfig.HTTPConfig, loggers ldlog.Loggers, options ...OptionType) (*HTTPEventPublisher, error) { closer := make(chan struct{}) @@ -257,10 +269,24 @@ func (p *HTTPEventPublisher) append(batch eventBatch) { p.overflowed = true } taken = available + if dropped := len(batch.events) - taken; dropped > 0 && p.eventMetrics != nil { + p.eventMetrics.RecordDroppedEvents(dropped) + } } else { p.overflowed = false } queue.events = append(queue.events, batch.events[:taken]...) + if p.eventMetrics != nil { + p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) + } +} + +func (p *HTTPEventPublisher) totalPendingEvents() int { + total := 0 + for _, q := range p.queues { + total += len(q.events) + } + return total } func (p *HTTPEventPublisher) ReplaceCredential(newCredential credential.SDKCredential) { //nolint:revive // method is already documented in interface @@ -338,6 +364,7 @@ func (p *HTTPEventPublisher) flush() { return ret } + eventMetrics := p.eventMetrics go func() { // The EventSender created by ldevents.NewDefaultEventSender implements the standard retry behavior, // and error logging, in its SendEventData method. Retries could cause this call to block for a while, @@ -351,12 +378,25 @@ func (p *HTTPEventPublisher) flush() { EnableCompression: true, } result := ldevents.SendEventDataWithRetry(sendConfig, ldevents.AnalyticsEventDataKind, p.uriPath, payload, count) + if eventMetrics != nil { + if result.Success { + eventMetrics.RecordEventsSent(count) + eventMetrics.RecordEventsBytesSent(len(payload)) + } else { + eventMetrics.RecordEventsFailedSend(count, EventSendFailureMetadata{ + StatusCode: result.StatusCode, + }) + } + } p.wg.Done() if result.MustShutDown { p.disableQueue <- struct{}{} } }() } + if p.eventMetrics != nil { + p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) + } } func (p *HTTPEventPublisher) Close() { //nolint:revive // method is already documented in interface diff --git a/internal/events/event_publisher_test.go b/internal/events/event_publisher_test.go index c44e9ba2..2857657f 100644 --- a/internal/events/event_publisher_test.go +++ b/internal/events/event_publisher_test.go @@ -189,6 +189,148 @@ func TestHTTPEventPublisherCapacity(t *testing.T) { }) } +type mockEventMetrics struct { + droppedCount int + sentCount int + failedSendCount int + lastFailedSendMeta EventSendFailureMetadata + bytesSent int + lastPendingEvents int +} + +func (m *mockEventMetrics) RecordDroppedEvents(count int) { + m.droppedCount += count +} + +func (m *mockEventMetrics) RecordEventsSent(count int) { + m.sentCount += count +} + +func (m *mockEventMetrics) RecordEventsFailedSend(count int, metadata EventSendFailureMetadata) { + m.failedSendCount += count + m.lastFailedSendMeta = metadata +} + +func (m *mockEventMetrics) RecordEventsBytesSent(bytes int) { + m.bytesSent += bytes +} + +func (m *mockEventMetrics) RecordPendingEvents(depth int) { + m.lastPendingEvents = depth +} + +func TestHTTPEventPublisherDroppedEventsMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionCapacity(2), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + // Publish 5 events with capacity 2 — should drop 3 + publisher.Publish(EventPayloadMetadata{}, + json.RawMessage(`"a"`), json.RawMessage(`"b"`), + json.RawMessage(`"c"`), json.RawMessage(`"d"`), json.RawMessage(`"e"`)) + publisher.Flush() + + r := helpers.RequireValue(t, requestsCh, time.Second) + uncompressed, err := util.DecompressGzipData(r.Body) + assert.NoError(t, err) + m.In(t).Assert(uncompressed, m.JSONStrEqual(`["a","b"]`)) + + assert.Equal(t, 3, metrics.droppedCount) + }) +} + +func TestHTTPEventPublisherEventsSentMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"a"`), json.RawMessage(`"b"`), json.RawMessage(`"c"`)) + publisher.Flush() + + _ = helpers.RequireValue(t, requestsCh, time.Second) + // Wait for the goroutine to record the metric after the send completes + assert.Eventually(t, func() bool { return metrics.sentCount == 3 }, time.Second, 10*time.Millisecond) + assert.Greater(t, metrics.bytesSent, 0) + }) +} + +func TestHTTPEventPublisherPendingEventsMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + // Publish 3 events — queue depth should be 3 after processing + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"a"`), json.RawMessage(`"b"`), json.RawMessage(`"c"`)) + // Flush will clear the queue — depth should go to 0 + publisher.Flush() + + _ = helpers.RequireValue(t, requestsCh, time.Second) + assert.Eventually(t, func() bool { return metrics.lastPendingEvents == 0 }, time.Second, 10*time.Millisecond) + }) +} + +func TestHTTPEventPublisherEventsFailedSendMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + // Return 500 twice (exhausts retries) so the send fails + handler, requestsCh := httphelpers.RecordingHandler( + httphelpers.SequentialHandler( + httphelpers.HandlerWithStatus(503), + httphelpers.HandlerWithStatus(503), + ), + ) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"a"`), json.RawMessage(`"b"`)) + publisher.Flush() + + // Wait for both retry attempts + _ = helpers.RequireValue(t, requestsCh, 5*time.Second) + _ = helpers.RequireValue(t, requestsCh, 5*time.Second) + + assert.Eventually(t, func() bool { return metrics.failedSendCount == 2 }, 5*time.Second, 10*time.Millisecond) + assert.Equal(t, 0, metrics.sentCount) + assert.Equal(t, 503, metrics.lastFailedSendMeta.StatusCode) + }) +} + +func TestHTTPEventPublisherDroppedEventsMetricNilIsNoOp(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + // No EventMetrics option — should not panic + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionCapacity(1)) + defer publisher.Close() + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"hello"`), json.RawMessage(`"goodbye"`)) + publisher.Flush() + r := helpers.RequireValue(t, requestsCh, time.Second) + uncompressed, err := util.DecompressGzipData(r.Body) + assert.NoError(t, err) + m.In(t).Assert(uncompressed, m.JSONStrEqual(`["hello"]`)) + }) +} + func TestHTTPEventPublisherErrorRetry(t *testing.T) { testRecoverableError := func(t *testing.T, errorHandler http.Handler) { mockLog := ldlogtest.NewMockLog() diff --git a/internal/events/summarizing-relay.go b/internal/events/summarizing-relay.go index 58d89bc6..055dc473 100644 --- a/internal/events/summarizing-relay.go +++ b/internal/events/summarizing-relay.go @@ -65,6 +65,7 @@ func newEventSummarizingRelay( loggers ldlog.Loggers, remotePath string, eventQueueCleanupInterval time.Duration, + eventMetrics EventMetrics, ) *eventSummarizingRelay { eventsConfig := ldevents.EventsConfiguration{ Capacity: config.Capacity.GetOrElse(c.DefaultEventCapacity), @@ -72,6 +73,7 @@ func newEventSummarizingRelay( Loggers: loggers, UserKeysCapacity: ldcomponents.DefaultContextKeysCapacity, UserKeysFlushInterval: ldcomponents.DefaultContextKeysFlushInterval, + EventMetrics: eventMetrics, } baseHeaders := make(http.Header) for k, v := range httpConfig.SDKHTTPConfig.DefaultHeaders { diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go index 36c5f6bc..bfc39fa0 100644 --- a/internal/metrics/constants.go +++ b/internal/metrics/constants.go @@ -12,7 +12,12 @@ const ( connMeasureName = "launchdarkly.relay.connections" requestMeasureName = "launchdarkly.relay.requests" requestDurationMeasureName = "launchdarkly.relay.request.duration" - eventsIngestedBytesMeasureName = "launchdarkly.relay.events.ingested.bytes" + eventsReceivedBytesMeasureName = "launchdarkly.relay.events.received.bytes" + eventsSentCountMeasureName = "launchdarkly.relay.events.sent.count" + eventsSentBytesMeasureName = "launchdarkly.relay.events.sent.bytes" + eventsSentFailuresMeasureName = "launchdarkly.relay.events.sent.failures" + eventsSentDroppedMeasureName = "launchdarkly.relay.events.sent.dropped" + eventsSentPendingMeasureName = "launchdarkly.relay.events.sent.pending" defaultFlushInterval = time.Minute @@ -32,6 +37,7 @@ var ( applicationIDAttrKey = attribute.Key("application.id") //nolint:gochecknoglobals applicationVersionAttrKey = attribute.Key("application.version") //nolint:gochecknoglobals instanceIDAttrKey = attribute.Key("instanceId") //nolint:gochecknoglobals + statusCodeAttrKey = attribute.Key("statusCode") //nolint:gochecknoglobals ) // buildAttributes creates an OTel attribute set from base key-values plus per-request attributes. diff --git a/internal/metrics/measures.go b/internal/metrics/measures.go index dd8f3b8a..01f00455 100644 --- a/internal/metrics/measures.go +++ b/internal/metrics/measures.go @@ -2,8 +2,11 @@ package metrics import ( "context" + "strconv" "time" + ldevents "github.com/launchdarkly/go-sdk-events/v3" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) @@ -13,6 +16,11 @@ type Instruments struct { requests metric.Int64Counter // cumulative HTTP requests requestDuration metric.Float64Histogram // request duration in seconds eventsIngestedBytes metric.Int64Counter // cumulative bytes of event data ingested + eventsDropped metric.Int64Counter // cumulative count of events dropped due to capacity overflow + eventsSent metric.Int64Counter // cumulative count of events successfully sent + eventsFailedSend metric.Int64Counter // cumulative count of events that failed to send + eventsBytesSent metric.Int64Counter // cumulative bytes of event payloads successfully sent + pendingEvents metric.Int64Gauge // current number of events pending delivery } // Measure identifies what to record. Each pre-defined Measure var specifies which @@ -70,7 +78,27 @@ func NewInstrumentsForTest(meter metric.Meter) (*Instruments, error) { if err != nil { return nil, err } - eventsIngestedBytes, err := meter.Int64Counter(eventsIngestedBytesMeasureName) + eventsIngestedBytes, err := meter.Int64Counter(eventsReceivedBytesMeasureName) + if err != nil { + return nil, err + } + eventsDropped, err := meter.Int64Counter(eventsSentDroppedMeasureName) + if err != nil { + return nil, err + } + eventsSent, err := meter.Int64Counter(eventsSentCountMeasureName) + if err != nil { + return nil, err + } + eventsFailedSend, err := meter.Int64Counter(eventsSentFailuresMeasureName) + if err != nil { + return nil, err + } + eventsBytesSent, err := meter.Int64Counter(eventsSentBytesMeasureName) + if err != nil { + return nil, err + } + pendingEvents, err := meter.Int64Gauge(eventsSentPendingMeasureName) if err != nil { return nil, err } @@ -79,6 +107,11 @@ func NewInstrumentsForTest(meter metric.Meter) (*Instruments, error) { requests: requests, requestDuration: requestDuration, eventsIngestedBytes: eventsIngestedBytes, + eventsDropped: eventsDropped, + eventsSent: eventsSent, + eventsFailedSend: eventsFailedSend, + eventsBytesSent: eventsBytesSent, + pendingEvents: pendingEvents, }, nil } @@ -177,3 +210,61 @@ func RecordRequestDuration(ctx context.Context, instruments *Instruments, em *En attrs := buildRequestAttributes(em.envKVs, measure.platformCategory, ua, wrapper, route, method, appID, appVersion, instanceID) instruments.requestDuration.Record(ctx, duration.Seconds(), metric.WithAttributeSet(attrs)) } + +// EventMetricsRecorder implements the EventMetrics interface defined in both the events package +// and go-sdk-events, recording event processing metrics via OTEL instruments. It uses +// environment-level attributes only (relayId, env) since event drops occur asynchronously, +// detached from any specific HTTP request context. +type EventMetricsRecorder struct { + instruments *Instruments + envKVs []attribute.KeyValue +} + +// RecordDroppedEvents records the number of events dropped due to capacity overflow. +func (r *EventMetricsRecorder) RecordDroppedEvents(count int) { + if r.instruments == nil || count <= 0 { + return + } + attrs := attribute.NewSet(r.envKVs...) + r.instruments.eventsDropped.Add(context.Background(), int64(count), metric.WithAttributeSet(attrs)) +} + +// RecordEventsSent records the number of events successfully delivered to the events service. +func (r *EventMetricsRecorder) RecordEventsSent(count int) { + if r.instruments == nil || count <= 0 { + return + } + attrs := attribute.NewSet(r.envKVs...) + r.instruments.eventsSent.Add(context.Background(), int64(count), metric.WithAttributeSet(attrs)) +} + +// RecordPendingEvents records the current number of events pending delivery. +func (r *EventMetricsRecorder) RecordPendingEvents(depth int) { + if r.instruments == nil { + return + } + attrs := attribute.NewSet(r.envKVs...) + r.instruments.pendingEvents.Record(context.Background(), int64(depth), metric.WithAttributeSet(attrs)) +} + +// RecordEventsBytesSent records the size of event payloads successfully delivered. +func (r *EventMetricsRecorder) RecordEventsBytesSent(bytes int) { + if r.instruments == nil || bytes <= 0 { + return + } + attrs := attribute.NewSet(r.envKVs...) + r.instruments.eventsBytesSent.Add(context.Background(), int64(bytes), metric.WithAttributeSet(attrs)) +} + +// RecordEventsFailedSend records the number of events that could not be delivered after all retries. +// The status code from the metadata is included as an attribute on the metric. +func (r *EventMetricsRecorder) RecordEventsFailedSend(count int, metadata ldevents.EventSendFailureMetadata) { + if r.instruments == nil || count <= 0 { + return + } + kvs := make([]attribute.KeyValue, len(r.envKVs), len(r.envKVs)+1) + copy(kvs, r.envKVs) + kvs = append(kvs, statusCodeAttrKey.String(strconv.Itoa(metadata.StatusCode))) + attrs := attribute.NewSet(kvs...) + r.instruments.eventsFailedSend.Add(context.Background(), int64(count), metric.WithAttributeSet(attrs)) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8eb31d41..1fb8b7be 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -102,15 +102,32 @@ func NewManager( requestDuration, _ := meter.Float64Histogram(requestDurationMeasureName, otelmetric.WithDescription("request duration in seconds"), otelmetric.WithUnit("s")) - eventsIngestedBytes, _ := meter.Int64Counter(eventsIngestedBytesMeasureName, + eventsIngestedBytes, _ := meter.Int64Counter(eventsReceivedBytesMeasureName, otelmetric.WithDescription("cumulative bytes of event data ingested"), otelmetric.WithUnit("By")) + eventsDropped, _ := meter.Int64Counter(eventsSentDroppedMeasureName, + otelmetric.WithDescription("cumulative count of events dropped due to capacity overflow")) + eventsSent, _ := meter.Int64Counter(eventsSentCountMeasureName, + otelmetric.WithDescription("cumulative count of events successfully sent")) + eventsFailedSend, _ := meter.Int64Counter(eventsSentFailuresMeasureName, + otelmetric.WithDescription("cumulative count of events that failed to send after all retries")) + eventsBytesSent, _ := meter.Int64Counter(eventsSentBytesMeasureName, + otelmetric.WithDescription("cumulative bytes of event payloads successfully sent"), + otelmetric.WithUnit("By")) + pendingEvents, _ := meter.Int64Gauge(eventsSentPendingMeasureName, + otelmetric.WithDescription("current number of events buffered in the queue")) + instruments := &Instruments{ connections: connections, requests: requests, requestDuration: requestDuration, eventsIngestedBytes: eventsIngestedBytes, + eventsDropped: eventsDropped, + eventsSent: eventsSent, + eventsFailedSend: eventsFailedSend, + eventsBytesSent: eventsBytesSent, + pendingEvents: pendingEvents, } usageChan := make(chan any, 256) @@ -274,6 +291,13 @@ func (em *EnvironmentManager) GetAttributes() attribute.Set { return attribute.NewSet(em.envKVs...) } +// NewEventMetricsRecorder creates an EventMetricsRecorder that records event processing metrics +// with this environment's attributes. The returned recorder satisfies the EventMetrics interfaces +// defined in both the events package and go-sdk-events. +func (em *EnvironmentManager) NewEventMetricsRecorder(instruments *Instruments) *EventMetricsRecorder { + return &EventMetricsRecorder{instruments: instruments, envKVs: em.envKVs} +} + // FlushEventsExporter is used in testing to trigger the collector to post data to the event publisher. func (em *EnvironmentManager) FlushEventsExporter() { if em.collector != nil { diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index e45f83d1..89be3c20 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -183,7 +183,7 @@ func TestRecordEventsIngestedBytes(t *testing.T) { rm, err := p.collectMetrics() require.NoError(t, err) - m := findMetric(rm, eventsIngestedBytesMeasureName) + m := findMetric(rm, eventsReceivedBytesMeasureName) require.NotNil(t, m, "events ingested bytes metric not found") sum, ok := m.Data.(metricdata.Sum[int64]) require.True(t, ok, "expected Sum[int64] data") diff --git a/internal/metrics/test_utils_test.go b/internal/metrics/test_utils_test.go index 066d839a..3f436f66 100644 --- a/internal/metrics/test_utils_test.go +++ b/internal/metrics/test_utils_test.go @@ -55,7 +55,7 @@ func testWithOTel(t *testing.T, action func(testWithOTelParams)) { connections, _ := meter.Int64UpDownCounter(connMeasureName) requests, _ := meter.Int64Counter(requestMeasureName) requestDuration, _ := meter.Float64Histogram(requestDurationMeasureName) - eventsIngestedBytes, _ := meter.Int64Counter(eventsIngestedBytesMeasureName) + eventsIngestedBytes, _ := meter.Int64Counter(eventsReceivedBytesMeasureName) instruments := &Instruments{ connections: connections, diff --git a/internal/middleware/metrics_middleware_test.go b/internal/middleware/metrics_middleware_test.go index 06f61d6e..7696044c 100644 --- a/internal/middleware/metrics_middleware_test.go +++ b/internal/middleware/metrics_middleware_test.go @@ -198,8 +198,8 @@ func TestEventBytesMetrics(t *testing.T) { router.ServeHTTP(httptest.NewRecorder(), req) rm := p.collectMetrics(t) - bytesMetric := st.FindMetricByName(rm, "launchdarkly.relay.events.ingested.bytes") - require.NotNil(t, bytesMetric, "events ingested bytes metric not found") + bytesMetric := st.FindMetricByName(rm, "launchdarkly.relay.events.received.bytes") + require.NotNil(t, bytesMetric, "events received bytes metric not found") assertMetricHasValue(t, bytesMetric, p.envName, "server", 35) }) } diff --git a/internal/relayenv/env_context_impl.go b/internal/relayenv/env_context_impl.go index 0ea92080..1dfa46c8 100644 --- a/internal/relayenv/env_context_impl.go +++ b/internal/relayenv/env_context_impl.go @@ -286,28 +286,6 @@ func NewEnvContext( wrapper := datadestination.NewDataDesinationWrapper(envStreamUpdates) envContext.wrapper = wrapper - var eventDispatcher *events.EventDispatcher - if allConfig.Events.SendEvents { - if offlineMode { - envLoggers.Info("Events will be accepted for this environment, but will be discarded, since offline mode is enabled") - } else { - envLoggers.Info("Proxying events for this environment") - eventLoggers := envLoggers - eventLoggers.SetPrefix(logPrefix + " (event proxy)") - eventDispatcher = events.NewEventDispatcher( - envConfig.SDKKey, - envConfig.MobileKey, - envConfig.EnvID, - envLoggers, - allConfig.Events, - httpConfig, - wrapper, - 0, // 0 here means "use the default interval for any periodic cleanup task you may need to run" - ) - } - } - envContext.eventDispatcher = eventDispatcher - streamURI := allConfig.Main.StreamURI.String() // config.ValidateConfig has ensured that this has a value baseURI := allConfig.Main.BaseURI.String() eventsURI := allConfig.Events.EventsURI.String() // ditto @@ -343,6 +321,37 @@ func NewEnvContext( envContext.metricsEnv = em + // Create an EventMetrics recorder for the event dispatchers to use when reporting + // internal metrics like dropped events. This must be done after the EnvironmentManager + // is created so we have access to the environment-level OTEL attributes. + var eventMetrics events.EventMetrics + if em != nil { + eventMetrics = em.NewEventMetricsRecorder(params.MetricsManager.GetInstruments()) + } + + var eventDispatcher *events.EventDispatcher + if allConfig.Events.SendEvents { + if offlineMode { + envLoggers.Info("Events will be accepted for this environment, but will be discarded, since offline mode is enabled") + } else { + envLoggers.Info("Proxying events for this environment") + eventLoggers := envLoggers + eventLoggers.SetPrefix(logPrefix + " (event proxy)") + eventDispatcher = events.NewEventDispatcher( + envConfig.SDKKey, + envConfig.MobileKey, + envConfig.EnvID, + envLoggers, + allConfig.Events, + httpConfig, + wrapper, + 0, // 0 here means "use the default interval for any periodic cleanup task you may need to run" + eventMetrics, + ) + } + } + envContext.eventDispatcher = eventDispatcher + disconnectedStatusTime := allConfig.Main.DisconnectedStatusTime.GetOrElse(config.DefaultDisconnectedStatusTime) streamingBuilder := ldcomponents.StreamingDataSourceV2().BaseURI(streamURI) From 666f7603a8bf1020b56d43de155ffe815295f184 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 14:02:04 -0400 Subject: [PATCH 02/10] fix: Use NewInstrumentsForTest in testWithOTel helper The test helper was manually constructing a partial Instruments struct with only the original 4 fields, leaving the 5 new event metric instruments as nil. This could cause panics if any test exercises EventMetricsRecorder through this helper, since the recorder nil-checks the struct pointer but not individual instrument fields. Switch to NewInstrumentsForTest which creates a complete Instruments struct with all fields populated. Add TestEventMetricsRecorderViaTestHelper to exercise the recorder through the test helper and catch any future regressions. --- internal/metrics/metrics_test.go | 26 ++++++++++++++++++++++++++ internal/metrics/test_utils_test.go | 13 ++----------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 89be3c20..9e0b1f4b 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -201,6 +201,32 @@ func TestRecordEventsIngestedBytes(t *testing.T) { }) } +func TestEventMetricsRecorderViaTestHelper(t *testing.T) { + testWithOTel(t, func(p testWithOTelParams) { + recorder := p.env.NewEventMetricsRecorder(p.instruments) + + recorder.RecordDroppedEvents(5) + recorder.RecordEventsSent(10) + recorder.RecordEventsBytesSent(2048) + recorder.RecordPendingEvents(3) + + rm, err := p.collectMetrics() + require.NoError(t, err) + + droppedMetric := findMetric(rm, eventsSentDroppedMeasureName) + require.NotNil(t, droppedMetric, "events dropped metric not found") + + sentMetric := findMetric(rm, eventsSentCountMeasureName) + require.NotNil(t, sentMetric, "events sent metric not found") + + bytesMetric := findMetric(rm, eventsSentBytesMeasureName) + require.NotNil(t, bytesMetric, "events bytes sent metric not found") + + pendingMetric := findMetric(rm, eventsSentPendingMeasureName) + require.NotNil(t, pendingMetric, "events pending metric not found") + }) +} + func TestSanitizeTagValue(t *testing.T) { assert.Equal(t, "abc", sanitizeTagValue("abc")) assert.Equal(t, "not-provided", sanitizeTagValue("")) diff --git a/internal/metrics/test_utils_test.go b/internal/metrics/test_utils_test.go index 3f436f66..af62dbb3 100644 --- a/internal/metrics/test_utils_test.go +++ b/internal/metrics/test_utils_test.go @@ -52,17 +52,8 @@ func testWithOTel(t *testing.T, action func(testWithOTelParams)) { meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) meter := meterProvider.Meter("ld-relay") - connections, _ := meter.Int64UpDownCounter(connMeasureName) - requests, _ := meter.Int64Counter(requestMeasureName) - requestDuration, _ := meter.Float64Histogram(requestDurationMeasureName) - eventsIngestedBytes, _ := meter.Int64Counter(eventsReceivedBytesMeasureName) - - instruments := &Instruments{ - connections: connections, - requests: requests, - requestDuration: requestDuration, - eventsIngestedBytes: eventsIngestedBytes, - } + instruments, err := NewInstrumentsForTest(meter) + require.NoError(t, err) manager, err := NewManager(config.OpenTelemetryConfig{}, time.Millisecond*10, mockLog.Loggers) require.NoError(t, err) From 46913d7c68a60c29f4f05c86a9a41d1d29f333b1 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 15:05:05 -0400 Subject: [PATCH 03/10] use published events module --- go.mod | 4 +--- go.sum | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 06edf810..2add4a3e 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/launchdarkly/ld-relay/v8 go 1.25.0 -replace github.com/launchdarkly/go-sdk-events/v3 => /home/mkeeler/code/launchdarkly/go-sdk-events.git/enhanced-telemetry - require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/aws/aws-sdk-go-v2 v1.24.1 @@ -27,7 +25,7 @@ require ( github.com/launchdarkly/go-configtypes v1.2.0 github.com/launchdarkly/go-jsonstream/v3 v3.1.0 github.com/launchdarkly/go-sdk-common/v3 v3.4.0 - github.com/launchdarkly/go-sdk-events/v3 v3.5.0 + github.com/launchdarkly/go-sdk-events/v3 v3.6.0 github.com/launchdarkly/go-server-sdk-consul/v3 v3.0.0 github.com/launchdarkly/go-server-sdk-dynamodb/v4 v4.0.0 github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1 diff --git a/go.sum b/go.sum index e2262dd8..a0c37c4f 100644 --- a/go.sum +++ b/go.sum @@ -280,6 +280,8 @@ github.com/launchdarkly/go-ntlmssp v1.0.3 h1:rFxOnnEJ2DzJ+NU0plhXqnldJUwn3wWJFTW github.com/launchdarkly/go-ntlmssp v1.0.3/go.mod h1:P1z6fX/y9zgBvfnZP7AKWilW9AX5M3czsa1S4Zpp2nM= github.com/launchdarkly/go-sdk-common/v3 v3.4.0 h1:GTRulE0G43xdWY1QdjAXJ7QnZ8PMFU8pOWZICCydEtM= github.com/launchdarkly/go-sdk-common/v3 v3.4.0/go.mod h1:6MNeeP8b2VtsM6I3TbShCHW/+tYh2c+p5dB+ilS69sg= +github.com/launchdarkly/go-sdk-events/v3 v3.6.0 h1:6AeoJot+bfptz4gtwPvPOXwwEGj9K1SLEGERPRaoNKU= +github.com/launchdarkly/go-sdk-events/v3 v3.6.0/go.mod h1:/N3ZUpfd1a+NrnJiO+812WrG5RQQNYP9qlgtI4I/IE4= github.com/launchdarkly/go-semver v1.0.3 h1:agIy/RN3SqeQDIfKkl+oFslEdeIs7pgsJBs3CdCcGQM= github.com/launchdarkly/go-semver v1.0.3/go.mod h1:xFmMwXba5Mb+3h72Z+VeSs9ahCvKo2QFUTHRNHVqR28= github.com/launchdarkly/go-server-sdk-consul/v3 v3.0.0 h1:AXmmU4rsMxdA75o4a9p+7Pl3SzdfUCLIw7CM7pBRifE= From b57b50897ad690cbd02d05c8c988c7b9dc7aeed8 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 15:46:49 -0400 Subject: [PATCH 04/10] add missing permissions --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c03c497..f4391f2b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,6 +47,9 @@ jobs: integration-test: needs: go-versions uses: ./.github/workflows/integration-test.yml + permissions: + id-token: write + contents: read with: environment: 'staging' go-version: ${{ needs.go-versions.outputs.latest }} From 3d42166ff8fef83bbf6beea40d0946fbe96afaee Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 16:19:13 -0400 Subject: [PATCH 05/10] fix: Add mutex to mockEventMetrics to prevent data races The mock's fields are written by the event loop goroutine and flush goroutines while tests read them concurrently via assert.Eventually. Add a sync.Mutex and thread-safe accessor methods to prevent races under go test -race. --- internal/events/event_publisher_test.go | 62 ++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/internal/events/event_publisher_test.go b/internal/events/event_publisher_test.go index 2857657f..308561b3 100644 --- a/internal/events/event_publisher_test.go +++ b/internal/events/event_publisher_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "sort" "strconv" + "sync" "testing" "time" @@ -190,6 +191,7 @@ func TestHTTPEventPublisherCapacity(t *testing.T) { } type mockEventMetrics struct { + mu sync.Mutex droppedCount int sentCount int failedSendCount int @@ -199,26 +201,72 @@ type mockEventMetrics struct { } func (m *mockEventMetrics) RecordDroppedEvents(count int) { + m.mu.Lock() + defer m.mu.Unlock() m.droppedCount += count } func (m *mockEventMetrics) RecordEventsSent(count int) { + m.mu.Lock() + defer m.mu.Unlock() m.sentCount += count } func (m *mockEventMetrics) RecordEventsFailedSend(count int, metadata EventSendFailureMetadata) { + m.mu.Lock() + defer m.mu.Unlock() m.failedSendCount += count m.lastFailedSendMeta = metadata } func (m *mockEventMetrics) RecordEventsBytesSent(bytes int) { + m.mu.Lock() + defer m.mu.Unlock() m.bytesSent += bytes } func (m *mockEventMetrics) RecordPendingEvents(depth int) { + m.mu.Lock() + defer m.mu.Unlock() m.lastPendingEvents = depth } +func (m *mockEventMetrics) getDroppedCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.droppedCount +} + +func (m *mockEventMetrics) getSentCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.sentCount +} + +func (m *mockEventMetrics) getFailedSendCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.failedSendCount +} + +func (m *mockEventMetrics) getLastFailedSendMeta() EventSendFailureMetadata { + m.mu.Lock() + defer m.mu.Unlock() + return m.lastFailedSendMeta +} + +func (m *mockEventMetrics) getBytesSent() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.bytesSent +} + +func (m *mockEventMetrics) getLastPendingEvents() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.lastPendingEvents +} + func TestHTTPEventPublisherDroppedEventsMetric(t *testing.T) { mockLog := ldlogtest.NewMockLog() defer mockLog.DumpIfTestFailed(t) @@ -240,7 +288,7 @@ func TestHTTPEventPublisherDroppedEventsMetric(t *testing.T) { assert.NoError(t, err) m.In(t).Assert(uncompressed, m.JSONStrEqual(`["a","b"]`)) - assert.Equal(t, 3, metrics.droppedCount) + assert.Equal(t, 3, metrics.getDroppedCount()) }) } @@ -259,8 +307,8 @@ func TestHTTPEventPublisherEventsSentMetric(t *testing.T) { _ = helpers.RequireValue(t, requestsCh, time.Second) // Wait for the goroutine to record the metric after the send completes - assert.Eventually(t, func() bool { return metrics.sentCount == 3 }, time.Second, 10*time.Millisecond) - assert.Greater(t, metrics.bytesSent, 0) + assert.Eventually(t, func() bool { return metrics.getSentCount() == 3 }, time.Second, 10*time.Millisecond) + assert.Greater(t, metrics.getBytesSent(), 0) }) } @@ -280,7 +328,7 @@ func TestHTTPEventPublisherPendingEventsMetric(t *testing.T) { publisher.Flush() _ = helpers.RequireValue(t, requestsCh, time.Second) - assert.Eventually(t, func() bool { return metrics.lastPendingEvents == 0 }, time.Second, 10*time.Millisecond) + assert.Eventually(t, func() bool { return metrics.getLastPendingEvents() == 0 }, time.Second, 10*time.Millisecond) }) } @@ -307,9 +355,9 @@ func TestHTTPEventPublisherEventsFailedSendMetric(t *testing.T) { _ = helpers.RequireValue(t, requestsCh, 5*time.Second) _ = helpers.RequireValue(t, requestsCh, 5*time.Second) - assert.Eventually(t, func() bool { return metrics.failedSendCount == 2 }, 5*time.Second, 10*time.Millisecond) - assert.Equal(t, 0, metrics.sentCount) - assert.Equal(t, 503, metrics.lastFailedSendMeta.StatusCode) + assert.Eventually(t, func() bool { return metrics.getFailedSendCount() == 2 }, 5*time.Second, 10*time.Millisecond) + assert.Equal(t, 0, metrics.getSentCount()) + assert.Equal(t, 503, metrics.getLastFailedSendMeta().StatusCode) }) } From bdfc71e24938d7c82f8204ff616641fd11a568c2 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 16:35:38 -0400 Subject: [PATCH 06/10] fix: Reset pending events gauge when publisher is disabled When the publisher receives an unrecoverable failure and clears its queues, the pending events gauge was not being reset to 0, leaving a stale non-zero value in the metric. --- internal/events/event_publisher.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/events/event_publisher.go b/internal/events/event_publisher.go index 6ed78a19..e97d2cbd 100644 --- a/internal/events/event_publisher.go +++ b/internal/events/event_publisher.go @@ -229,6 +229,9 @@ func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconf // Ensure we free up as much memory as we can by clearing any pending events p.queues = make(map[EventPayloadMetadata]*publisherQueue) p.disabled = true + if p.eventMetrics != nil { + p.eventMetrics.RecordPendingEvents(0) + } case e := <-inputQueue: if p.disabled { continue From 18ccc0694f7a52ba9206cba01f48539a65b3bcf7 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 16:52:38 -0400 Subject: [PATCH 07/10] fix: Fix gofmt formatting and prealloc lint issues Run gofmt on metrics package files and preallocate the opts slice in newEventVerbatimRelay with the correct capacity. --- internal/events/event-relay.go | 5 +++-- internal/metrics/constants.go | 2 +- internal/metrics/measures.go | 4 ++-- internal/metrics/metrics.go | 2 +- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/events/event-relay.go b/internal/events/event-relay.go index 361a0d01..d89c83ad 100644 --- a/internal/events/event-relay.go +++ b/internal/events/event-relay.go @@ -310,12 +310,13 @@ func newEventVerbatimRelay( extraOptions ...OptionType, ) *eventVerbatimRelay { eventsURI := getEventsURI(config) - opts := []OptionType{ + opts := make([]OptionType, 0, 4+len(extraOptions)) + opts = append(opts, OptionCapacity(config.Capacity.GetOrElse(c.DefaultEventCapacity)), OptionBaseURI(eventsURI), OptionURIPath(remotePath), OptionFlushInterval(config.FlushInterval.GetOrElse(c.DefaultEventsFlushInterval)), - } + ) opts = append(opts, extraOptions...) publisher, _ := NewHTTPEventPublisher(authKey, httpConfig, loggers, opts...) diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go index bfc39fa0..c8fa245f 100644 --- a/internal/metrics/constants.go +++ b/internal/metrics/constants.go @@ -17,7 +17,7 @@ const ( eventsSentBytesMeasureName = "launchdarkly.relay.events.sent.bytes" eventsSentFailuresMeasureName = "launchdarkly.relay.events.sent.failures" eventsSentDroppedMeasureName = "launchdarkly.relay.events.sent.dropped" - eventsSentPendingMeasureName = "launchdarkly.relay.events.sent.pending" + eventsSentPendingMeasureName = "launchdarkly.relay.events.sent.pending" defaultFlushInterval = time.Minute diff --git a/internal/metrics/measures.go b/internal/metrics/measures.go index 01f00455..8f9654a9 100644 --- a/internal/metrics/measures.go +++ b/internal/metrics/measures.go @@ -20,7 +20,7 @@ type Instruments struct { eventsSent metric.Int64Counter // cumulative count of events successfully sent eventsFailedSend metric.Int64Counter // cumulative count of events that failed to send eventsBytesSent metric.Int64Counter // cumulative bytes of event payloads successfully sent - pendingEvents metric.Int64Gauge // current number of events pending delivery + pendingEvents metric.Int64Gauge // current number of events pending delivery } // Measure identifies what to record. Each pre-defined Measure var specifies which @@ -111,7 +111,7 @@ func NewInstrumentsForTest(meter metric.Meter) (*Instruments, error) { eventsSent: eventsSent, eventsFailedSend: eventsFailedSend, eventsBytesSent: eventsBytesSent, - pendingEvents: pendingEvents, + pendingEvents: pendingEvents, }, nil } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 1fb8b7be..cfeb6796 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -127,7 +127,7 @@ func NewManager( eventsSent: eventsSent, eventsFailedSend: eventsFailedSend, eventsBytesSent: eventsBytesSent, - pendingEvents: pendingEvents, + pendingEvents: pendingEvents, } usageChan := make(chan any, 256) From dd3e4af3a0f86dfe74774e103daa33c5b42a1d24 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 17:04:26 -0400 Subject: [PATCH 08/10] fix: Pre-compute attribute set in EventMetricsRecorder to avoid data race attribute.NewSet() sorts the input slice in place, which caused a data race when multiple goroutines concurrently accessed the shared envKVs slice (the event loop goroutine via RecordPendingEvents and HTTP handler goroutines via buildRequestAttributes). Fix by making a private copy of envKVs at recorder construction time and pre-computing the attribute.Set once. Methods that only need env attributes reuse the pre-computed set. RecordEventsFailedSend still builds a fresh set per call since it adds the statusCode attribute, but copies from the recorder's private slice which is safe. --- internal/metrics/measures.go | 19 ++++++++++--------- internal/metrics/metrics.go | 11 ++++++++++- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/internal/metrics/measures.go b/internal/metrics/measures.go index 8f9654a9..dc5d3359 100644 --- a/internal/metrics/measures.go +++ b/internal/metrics/measures.go @@ -211,13 +211,18 @@ func RecordRequestDuration(ctx context.Context, instruments *Instruments, em *En instruments.requestDuration.Record(ctx, duration.Seconds(), metric.WithAttributeSet(attrs)) } +// EventMetricsRecorder implements the EventMetrics interface defined in both the events package +// and go-sdk-events, recording event processing metrics via OTEL instruments. It uses +// environment-level attributes only (relayId, env) since event drops occur asynchronously, +// detached from any specific HTTP request context. // EventMetricsRecorder implements the EventMetrics interface defined in both the events package // and go-sdk-events, recording event processing metrics via OTEL instruments. It uses // environment-level attributes only (relayId, env) since event drops occur asynchronously, // detached from any specific HTTP request context. type EventMetricsRecorder struct { instruments *Instruments - envKVs []attribute.KeyValue + envKVs []attribute.KeyValue // private copy, safe for concurrent read + envAttrs attribute.Set // pre-computed to avoid concurrent sort in attribute.NewSet } // RecordDroppedEvents records the number of events dropped due to capacity overflow. @@ -225,8 +230,7 @@ func (r *EventMetricsRecorder) RecordDroppedEvents(count int) { if r.instruments == nil || count <= 0 { return } - attrs := attribute.NewSet(r.envKVs...) - r.instruments.eventsDropped.Add(context.Background(), int64(count), metric.WithAttributeSet(attrs)) + r.instruments.eventsDropped.Add(context.Background(), int64(count), metric.WithAttributeSet(r.envAttrs)) } // RecordEventsSent records the number of events successfully delivered to the events service. @@ -234,8 +238,7 @@ func (r *EventMetricsRecorder) RecordEventsSent(count int) { if r.instruments == nil || count <= 0 { return } - attrs := attribute.NewSet(r.envKVs...) - r.instruments.eventsSent.Add(context.Background(), int64(count), metric.WithAttributeSet(attrs)) + r.instruments.eventsSent.Add(context.Background(), int64(count), metric.WithAttributeSet(r.envAttrs)) } // RecordPendingEvents records the current number of events pending delivery. @@ -243,8 +246,7 @@ func (r *EventMetricsRecorder) RecordPendingEvents(depth int) { if r.instruments == nil { return } - attrs := attribute.NewSet(r.envKVs...) - r.instruments.pendingEvents.Record(context.Background(), int64(depth), metric.WithAttributeSet(attrs)) + r.instruments.pendingEvents.Record(context.Background(), int64(depth), metric.WithAttributeSet(r.envAttrs)) } // RecordEventsBytesSent records the size of event payloads successfully delivered. @@ -252,8 +254,7 @@ func (r *EventMetricsRecorder) RecordEventsBytesSent(bytes int) { if r.instruments == nil || bytes <= 0 { return } - attrs := attribute.NewSet(r.envKVs...) - r.instruments.eventsBytesSent.Add(context.Background(), int64(bytes), metric.WithAttributeSet(attrs)) + r.instruments.eventsBytesSent.Add(context.Background(), int64(bytes), metric.WithAttributeSet(r.envAttrs)) } // RecordEventsFailedSend records the number of events that could not be delivered after all retries. diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index cfeb6796..f7bca64b 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -294,8 +294,17 @@ func (em *EnvironmentManager) GetAttributes() attribute.Set { // NewEventMetricsRecorder creates an EventMetricsRecorder that records event processing metrics // with this environment's attributes. The returned recorder satisfies the EventMetrics interfaces // defined in both the events package and go-sdk-events. +// +// The recorder makes a private copy of the environment attributes to avoid data races with +// attribute.NewSet's in-place sort. func (em *EnvironmentManager) NewEventMetricsRecorder(instruments *Instruments) *EventMetricsRecorder { - return &EventMetricsRecorder{instruments: instruments, envKVs: em.envKVs} + envKVsCopy := make([]attribute.KeyValue, len(em.envKVs)) + copy(envKVsCopy, em.envKVs) + return &EventMetricsRecorder{ + instruments: instruments, + envKVs: envKVsCopy, + envAttrs: attribute.NewSet(envKVsCopy...), + } } // FlushEventsExporter is used in testing to trigger the collector to post data to the event publisher. From aae1452389fce849b828e7baccaa37ac44643788 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 1 Apr 2026 17:15:41 -0400 Subject: [PATCH 09/10] refactor: Rename eventsIngestedBytes to eventsReceivedBytes Align the struct field, function, and doc comment names with the renamed metric (events.received.bytes). Also remove a duplicated doc comment on EventMetricsRecorder. --- docs/metrics.md | 4 ++-- internal/metrics/measures.go | 16 ++++++---------- internal/metrics/metrics.go | 6 +++--- internal/metrics/metrics_test.go | 10 +++++----- internal/middleware/metrics_middleware.go | 4 ++-- 5 files changed, 18 insertions(+), 22 deletions(-) diff --git a/docs/metrics.md b/docs/metrics.md index 5611f1da..f87fa938 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -11,7 +11,7 @@ The Relay Proxy can export metrics via [OpenTelemetry Protocol (OTLP)](https://o | `launchdarkly.relay.connections` | UpDownCounter | The number of currently active stream connections from SDKs to the Relay Proxy. | | `launchdarkly.relay.requests` | Counter | The cumulative number of requests received by the Relay Proxy's [service endpoints](./endpoints.md) since startup. | | `launchdarkly.relay.request.duration` | Histogram | The duration of requests to the Relay Proxy's service endpoints, in seconds. | -| `launchdarkly.relay.events.ingested.bytes` | Counter | The cumulative number of event bytes ingested by the Relay Proxy (measured after decompression). | +| `launchdarkly.relay.events.received.bytes` | Counter | The cumulative number of event bytes received by the Relay Proxy (measured after decompression). | ## Attributes @@ -42,7 +42,7 @@ OTEL_EXPORTER_OTLP_ENDPOINT=http://:9090/api/v1/otlp/v1/metrics OTEL_EXPORTER_OTLP_PROTOCOL=http ``` -Prometheus converts OpenTelemetry metric names by replacing dots with underscores, so the metrics will appear as `launchdarkly_relay_connections`, `launchdarkly_relay_requests_total`, `launchdarkly_relay_request_duration_seconds`, and `launchdarkly_relay_events_ingested_bytes_total`. +Prometheus converts OpenTelemetry metric names by replacing dots with underscores, so the metrics will appear as `launchdarkly_relay_connections`, `launchdarkly_relay_requests_total`, `launchdarkly_relay_request_duration_seconds`, and `launchdarkly_relay_events_received_bytes_total`. ### OpenTelemetry Collector diff --git a/internal/metrics/measures.go b/internal/metrics/measures.go index dc5d3359..243e6507 100644 --- a/internal/metrics/measures.go +++ b/internal/metrics/measures.go @@ -15,7 +15,7 @@ type Instruments struct { connections metric.Int64UpDownCounter // active connections (+1/-1) requests metric.Int64Counter // cumulative HTTP requests requestDuration metric.Float64Histogram // request duration in seconds - eventsIngestedBytes metric.Int64Counter // cumulative bytes of event data ingested + eventsReceivedBytes metric.Int64Counter // cumulative bytes of event data received eventsDropped metric.Int64Counter // cumulative count of events dropped due to capacity overflow eventsSent metric.Int64Counter // cumulative count of events successfully sent eventsFailedSend metric.Int64Counter // cumulative count of events that failed to send @@ -78,7 +78,7 @@ func NewInstrumentsForTest(meter metric.Meter) (*Instruments, error) { if err != nil { return nil, err } - eventsIngestedBytes, err := meter.Int64Counter(eventsReceivedBytesMeasureName) + eventsReceivedBytes, err := meter.Int64Counter(eventsReceivedBytesMeasureName) if err != nil { return nil, err } @@ -106,7 +106,7 @@ func NewInstrumentsForTest(meter metric.Meter) (*Instruments, error) { connections: connections, requests: requests, requestDuration: requestDuration, - eventsIngestedBytes: eventsIngestedBytes, + eventsReceivedBytes: eventsReceivedBytes, eventsDropped: eventsDropped, eventsSent: eventsSent, eventsFailedSend: eventsFailedSend, @@ -191,14 +191,14 @@ func WithRouteCount(ctx context.Context, em *EnvironmentManager, instruments *In f() } -// RecordEventsIngestedBytes records the number of event bytes ingested. -func RecordEventsIngestedBytes(ctx context.Context, instruments *Instruments, em *EnvironmentManager, platformCategory string, ri RequestInfo, bytes int64) { +// RecordEventsReceivedBytes records the number of event bytes received. +func RecordEventsReceivedBytes(ctx context.Context, instruments *Instruments, em *EnvironmentManager, platformCategory string, ri RequestInfo, bytes int64) { if em == nil || instruments == nil || bytes <= 0 { return } ua, wrapper, route, method, appID, appVersion, instanceID := ri.sanitized() attrs := buildRequestAttributes(em.envKVs, platformCategory, ua, wrapper, route, method, appID, appVersion, instanceID) - instruments.eventsIngestedBytes.Add(ctx, bytes, metric.WithAttributeSet(attrs)) + instruments.eventsReceivedBytes.Add(ctx, bytes, metric.WithAttributeSet(attrs)) } // RecordRequestDuration records a request duration measurement with the given attributes. @@ -211,10 +211,6 @@ func RecordRequestDuration(ctx context.Context, instruments *Instruments, em *En instruments.requestDuration.Record(ctx, duration.Seconds(), metric.WithAttributeSet(attrs)) } -// EventMetricsRecorder implements the EventMetrics interface defined in both the events package -// and go-sdk-events, recording event processing metrics via OTEL instruments. It uses -// environment-level attributes only (relayId, env) since event drops occur asynchronously, -// detached from any specific HTTP request context. // EventMetricsRecorder implements the EventMetrics interface defined in both the events package // and go-sdk-events, recording event processing metrics via OTEL instruments. It uses // environment-level attributes only (relayId, env) since event drops occur asynchronously, diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index f7bca64b..572be73c 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -102,8 +102,8 @@ func NewManager( requestDuration, _ := meter.Float64Histogram(requestDurationMeasureName, otelmetric.WithDescription("request duration in seconds"), otelmetric.WithUnit("s")) - eventsIngestedBytes, _ := meter.Int64Counter(eventsReceivedBytesMeasureName, - otelmetric.WithDescription("cumulative bytes of event data ingested"), + eventsReceivedBytes, _ := meter.Int64Counter(eventsReceivedBytesMeasureName, + otelmetric.WithDescription("cumulative bytes of event data received"), otelmetric.WithUnit("By")) eventsDropped, _ := meter.Int64Counter(eventsSentDroppedMeasureName, @@ -122,7 +122,7 @@ func NewManager( connections: connections, requests: requests, requestDuration: requestDuration, - eventsIngestedBytes: eventsIngestedBytes, + eventsReceivedBytes: eventsReceivedBytes, eventsDropped: eventsDropped, eventsSent: eventsSent, eventsFailedSend: eventsFailedSend, diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 9e0b1f4b..3d4954f9 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -33,7 +33,7 @@ func TestNewManagerReturnsInstruments(t *testing.T) { assert.NotNil(t, instruments.connections) assert.NotNil(t, instruments.requests) assert.NotNil(t, instruments.requestDuration) - assert.NotNil(t, instruments.eventsIngestedBytes) + assert.NotNil(t, instruments.eventsReceivedBytes) } func TestAddEnvironmentWithoutEventPublisher(t *testing.T) { @@ -177,14 +177,14 @@ func TestWithRouteCount(t *testing.T) { }) } -func TestRecordEventsIngestedBytes(t *testing.T) { +func TestRecordEventsReceivedBytes(t *testing.T) { testWithOTel(t, func(p testWithOTelParams) { - RecordEventsIngestedBytes(context.Background(), p.instruments, p.env, ServerPlatformCategory, RequestInfo{UserAgent: userAgentValue, Route: "/bulk", Method: "POST"}, 1024) + RecordEventsReceivedBytes(context.Background(), p.instruments, p.env, ServerPlatformCategory, RequestInfo{UserAgent: userAgentValue, Route: "/bulk", Method: "POST"}, 1024) rm, err := p.collectMetrics() require.NoError(t, err) m := findMetric(rm, eventsReceivedBytesMeasureName) - require.NotNil(t, m, "events ingested bytes metric not found") + require.NotNil(t, m, "events received bytes metric not found") sum, ok := m.Data.(metricdata.Sum[int64]) require.True(t, ok, "expected Sum[int64] data") require.NotEmpty(t, sum.DataPoints) @@ -197,7 +197,7 @@ func TestRecordEventsIngestedBytes(t *testing.T) { found = true } } - assert.True(t, found, "expected data point for events ingested bytes") + assert.True(t, found, "expected data point for events received bytes") }) } diff --git a/internal/middleware/metrics_middleware.go b/internal/middleware/metrics_middleware.go index f6b42e0f..602fdd59 100644 --- a/internal/middleware/metrics_middleware.go +++ b/internal/middleware/metrics_middleware.go @@ -168,7 +168,7 @@ func RequestMetrics(measure metrics.Measure) mux.MiddlewareFunc { } } -// EventBytesMetrics is a middleware function that records the number of event bytes ingested. +// EventBytesMetrics is a middleware function that records the number of event bytes received. // This should be applied after GzipMiddleware so that it measures decompressed bytes. func EventBytesMetrics(platformCategory string) mux.MiddlewareFunc { return func(next http.Handler) http.Handler { @@ -182,7 +182,7 @@ func EventBytesMetrics(platformCategory string) mux.MiddlewareFunc { next.ServeHTTP(w, req) env := GetEnvContextInfo(req.Context()).Env ri := requestInfoFromHTTP(req) - metrics.RecordEventsIngestedBytes(req.Context(), getInstruments(env), env.GetMetricsEnv(), platformCategory, ri, cr.bytesRead.Load()) + metrics.RecordEventsReceivedBytes(req.Context(), getInstruments(env), env.GetMetricsEnv(), platformCategory, ri, cr.bytesRead.Load()) }) } } From 26f084d44c5e7a71cce6139a2668de2f517141cd Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 2 Apr 2026 11:20:14 -0400 Subject: [PATCH 10/10] refactor: Add NoOpEventMetrics to eliminate nil checks Add a no-op implementation of EventMetrics used as the default when no metrics implementation is provided. Removes all nil checks at call sites in the publisher and dispatcher, following the null object pattern. --- internal/events/event-relay.go | 7 ++----- internal/events/event_metrics.go | 11 ++++++++++ internal/events/event_publisher.go | 33 +++++++++++++----------------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/internal/events/event-relay.go b/internal/events/event-relay.go index d89c83ad..c657301a 100644 --- a/internal/events/event-relay.go +++ b/internal/events/event-relay.go @@ -171,11 +171,8 @@ func (r *analyticsEventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRela r.mu.Lock() defer r.mu.Unlock() if r.verbatimRelay == nil { - var extraOpts []OptionType - if r.eventMetrics != nil { - extraOpts = append(extraOpts, OptionEventMetrics{EventMetrics: r.eventMetrics}) - } - r.verbatimRelay = newEventVerbatimRelay(r.authKey, r.config, r.httpConfig, r.loggers, r.remotePath, extraOpts...) + r.verbatimRelay = newEventVerbatimRelay(r.authKey, r.config, r.httpConfig, r.loggers, r.remotePath, + OptionEventMetrics{EventMetrics: r.eventMetrics}) } return r.verbatimRelay } diff --git a/internal/events/event_metrics.go b/internal/events/event_metrics.go index e07ae96f..a9fcf28f 100644 --- a/internal/events/event_metrics.go +++ b/internal/events/event_metrics.go @@ -37,3 +37,14 @@ type EventMetrics interface { // buffered awaiting delivery. The count parameter is the current total number of events pending. RecordPendingEvents(count int) } + +// NoOpEventMetrics is a default implementation of EventMetrics that does nothing. +// It is used when no EventMetrics is provided, eliminating the need for nil checks +// at every call site. +type NoOpEventMetrics struct{} + +func (NoOpEventMetrics) RecordDroppedEvents(int) {} +func (NoOpEventMetrics) RecordEventsSent(int) {} +func (NoOpEventMetrics) RecordEventsFailedSend(int, EventSendFailureMetadata) {} +func (NoOpEventMetrics) RecordEventsBytesSent(int) {} +func (NoOpEventMetrics) RecordPendingEvents(int) {} diff --git a/internal/events/event_publisher.go b/internal/events/event_publisher.go index e97d2cbd..833f40d6 100644 --- a/internal/events/event_publisher.go +++ b/internal/events/event_publisher.go @@ -209,6 +209,9 @@ func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconf } } + if p.eventMetrics == nil { + p.eventMetrics = NoOpEventMetrics{} + } p.queues = make(map[EventPayloadMetadata]*publisherQueue) p.wg.Add(1) @@ -229,9 +232,7 @@ func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconf // Ensure we free up as much memory as we can by clearing any pending events p.queues = make(map[EventPayloadMetadata]*publisherQueue) p.disabled = true - if p.eventMetrics != nil { - p.eventMetrics.RecordPendingEvents(0) - } + p.eventMetrics.RecordPendingEvents(0) case e := <-inputQueue: if p.disabled { continue @@ -272,16 +273,14 @@ func (p *HTTPEventPublisher) append(batch eventBatch) { p.overflowed = true } taken = available - if dropped := len(batch.events) - taken; dropped > 0 && p.eventMetrics != nil { + if dropped := len(batch.events) - taken; dropped > 0 { p.eventMetrics.RecordDroppedEvents(dropped) } } else { p.overflowed = false } queue.events = append(queue.events, batch.events[:taken]...) - if p.eventMetrics != nil { - p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) - } + p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) } func (p *HTTPEventPublisher) totalPendingEvents() int { @@ -381,15 +380,13 @@ func (p *HTTPEventPublisher) flush() { EnableCompression: true, } result := ldevents.SendEventDataWithRetry(sendConfig, ldevents.AnalyticsEventDataKind, p.uriPath, payload, count) - if eventMetrics != nil { - if result.Success { - eventMetrics.RecordEventsSent(count) - eventMetrics.RecordEventsBytesSent(len(payload)) - } else { - eventMetrics.RecordEventsFailedSend(count, EventSendFailureMetadata{ - StatusCode: result.StatusCode, - }) - } + if result.Success { + eventMetrics.RecordEventsSent(count) + eventMetrics.RecordEventsBytesSent(len(payload)) + } else { + eventMetrics.RecordEventsFailedSend(count, EventSendFailureMetadata{ + StatusCode: result.StatusCode, + }) } p.wg.Done() if result.MustShutDown { @@ -397,9 +394,7 @@ func (p *HTTPEventPublisher) flush() { } }() } - if p.eventMetrics != nil { - p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) - } + p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) } func (p *HTTPEventPublisher) Close() { //nolint:revive // method is already documented in interface