diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4c03c497..f4391f2b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,6 +47,9 @@ jobs: integration-test: needs: go-versions uses: ./.github/workflows/integration-test.yml + permissions: + id-token: write + contents: read with: environment: 'staging' go-version: ${{ needs.go-versions.outputs.latest }} diff --git a/docs/metrics.md b/docs/metrics.md index 5611f1da..f87fa938 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -11,7 +11,7 @@ The Relay Proxy can export metrics via [OpenTelemetry Protocol (OTLP)](https://o | `launchdarkly.relay.connections` | UpDownCounter | The number of currently active stream connections from SDKs to the Relay Proxy. | | `launchdarkly.relay.requests` | Counter | The cumulative number of requests received by the Relay Proxy's [service endpoints](./endpoints.md) since startup. | | `launchdarkly.relay.request.duration` | Histogram | The duration of requests to the Relay Proxy's service endpoints, in seconds. | -| `launchdarkly.relay.events.ingested.bytes` | Counter | The cumulative number of event bytes ingested by the Relay Proxy (measured after decompression). | +| `launchdarkly.relay.events.received.bytes` | Counter | The cumulative number of event bytes received by the Relay Proxy (measured after decompression). | ## Attributes @@ -42,7 +42,7 @@ OTEL_EXPORTER_OTLP_ENDPOINT=http://:9090/api/v1/otlp/v1/metrics OTEL_EXPORTER_OTLP_PROTOCOL=http ``` -Prometheus converts OpenTelemetry metric names by replacing dots with underscores, so the metrics will appear as `launchdarkly_relay_connections`, `launchdarkly_relay_requests_total`, `launchdarkly_relay_request_duration_seconds`, and `launchdarkly_relay_events_ingested_bytes_total`. +Prometheus converts OpenTelemetry metric names by replacing dots with underscores, so the metrics will appear as `launchdarkly_relay_connections`, `launchdarkly_relay_requests_total`, `launchdarkly_relay_request_duration_seconds`, and `launchdarkly_relay_events_received_bytes_total`. ### OpenTelemetry Collector diff --git a/go.mod b/go.mod index d1acff31..2add4a3e 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/launchdarkly/go-configtypes v1.2.0 github.com/launchdarkly/go-jsonstream/v3 v3.1.0 github.com/launchdarkly/go-sdk-common/v3 v3.4.0 - github.com/launchdarkly/go-sdk-events/v3 v3.5.0 + github.com/launchdarkly/go-sdk-events/v3 v3.6.0 github.com/launchdarkly/go-server-sdk-consul/v3 v3.0.0 github.com/launchdarkly/go-server-sdk-dynamodb/v4 v4.0.0 github.com/launchdarkly/go-server-sdk-evaluation/v3 v3.0.1 @@ -33,7 +33,6 @@ require ( github.com/launchdarkly/go-server-sdk/v7 v7.14.3 github.com/launchdarkly/go-test-helpers/v3 v3.1.0 github.com/pborman/uuid v1.2.1 - github.com/prometheus/client_golang v1.23.2 // override to address CVE-2022-21698 github.com/stretchr/testify v1.11.1 golang.org/x/sync v0.19.0 gopkg.in/gcfg.v1 v1.2.3 @@ -58,21 +57,16 @@ require ( github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b github.com/klauspost/compress v1.18.0 github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e + go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0 go.opentelemetry.io/otel v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 - go.opentelemetry.io/otel/exporters/prometheus v0.62.0 go.opentelemetry.io/otel/metric v1.42.0 go.opentelemetry.io/otel/sdk v1.42.0 go.opentelemetry.io/otel/sdk/metric v1.42.0 - go.opentelemetry.io/otel/trace v1.42.0 - google.golang.org/grpc v1.78.0 ) require ( - github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -99,19 +93,12 @@ require ( github.com/miekg/dns v1.1.58 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/gomega v1.27.10 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.67.5 // indirect - github.com/prometheus/otlptranslator v1.0.0 // indirect - github.com/prometheus/procfs v0.19.2 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/net v0.49.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect @@ -119,6 +106,7 @@ require ( golang.org/x/text v0.33.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect + google.golang.org/grpc v1.78.0 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 01ee6ba6..a0c37c4f 100644 --- a/go.sum +++ b/go.sum @@ -78,7 +78,6 @@ github.com/aws/smithy-go v1.19.0 h1:KWFKQV80DpP3vJrrA9sVAHQ5gc2z8i4EzrLhLlWXcBM= github.com/aws/smithy-go v1.19.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= @@ -265,8 +264,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= -github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e h1:PZ8SXmC5B/jTc8FfrWfSGNjy0ieGwqcKPjPV4vMtUqM= github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e/go.mod h1:cQRkOAs0LGcfIs6RSsHNqwhzItUZooyhpqPv0hgiQZM= github.com/launchdarkly/ccache v1.1.0 h1:voD1M+ZJXR3MREOKtBwgTF9hYHl1jg+vFKS/+VAkR2k= @@ -283,8 +280,8 @@ github.com/launchdarkly/go-ntlmssp v1.0.3 h1:rFxOnnEJ2DzJ+NU0plhXqnldJUwn3wWJFTW github.com/launchdarkly/go-ntlmssp v1.0.3/go.mod h1:P1z6fX/y9zgBvfnZP7AKWilW9AX5M3czsa1S4Zpp2nM= github.com/launchdarkly/go-sdk-common/v3 v3.4.0 h1:GTRulE0G43xdWY1QdjAXJ7QnZ8PMFU8pOWZICCydEtM= github.com/launchdarkly/go-sdk-common/v3 v3.4.0/go.mod h1:6MNeeP8b2VtsM6I3TbShCHW/+tYh2c+p5dB+ilS69sg= -github.com/launchdarkly/go-sdk-events/v3 v3.5.0 h1:Yav8Thm70dZbO8U1foYwZPf3w60n/lNBRaYeeNM/qg4= -github.com/launchdarkly/go-sdk-events/v3 v3.5.0/go.mod h1:oepYWQ2RvvjfL2WxkE1uJJIuRsIMOP4WIVgUpXRPcNI= +github.com/launchdarkly/go-sdk-events/v3 v3.6.0 h1:6AeoJot+bfptz4gtwPvPOXwwEGj9K1SLEGERPRaoNKU= +github.com/launchdarkly/go-sdk-events/v3 v3.6.0/go.mod h1:/N3ZUpfd1a+NrnJiO+812WrG5RQQNYP9qlgtI4I/IE4= github.com/launchdarkly/go-semver v1.0.3 h1:agIy/RN3SqeQDIfKkl+oFslEdeIs7pgsJBs3CdCcGQM= github.com/launchdarkly/go-semver v1.0.3/go.mod h1:xFmMwXba5Mb+3h72Z+VeSs9ahCvKo2QFUTHRNHVqR28= github.com/launchdarkly/go-server-sdk-consul/v3 v3.0.0 h1:AXmmU4rsMxdA75o4a9p+7Pl3SzdfUCLIw7CM7pBRifE= @@ -333,8 +330,6 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= -github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -361,25 +356,15 @@ github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSg github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= -github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= -github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= -github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= -github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= -github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= -github.com/prometheus/otlptranslator v1.0.0 h1:s0LJW/iN9dkIH+EnhiD3BlkkP5QVIUVEoIwkU+A6qos= -github.com/prometheus/otlptranslator v1.0.0/go.mod h1:vRYWnXvI6aWGpsdY/mOT/cbeVRBlPWtBNDb7kGR3uKM= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= -github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= -github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= @@ -421,44 +406,22 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0 h1:fM78cKITJ2r08cl+nw5i+hI9zWAu3iak8o1Os/ca2Ck= go.opentelemetry.io/contrib/instrumentation/runtime v0.67.0/go.mod h1:ybmlzIqGcQzwt5lAfi8TpSnHo/CI3yv1Czodmm+OJa8= -go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= -go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 h1:NOyNnS19BF2SUDApbOKbDtWZ0IK7b8FJ2uAGdIWOGb0= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0/go.mod h1:VL6EgVikRLcJa9ftukrHu/ZkkhFBSo1lzvdBC9CF1ss= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 h1:9y5sHvAxWzft1WQ4BwqcvA+IFVUJ1Ya75mSAUnFEVwE= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0/go.mod h1:eQqT90eR3X5Dbs1g9YSM30RavwLF725Ris5/XSXWvqE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0/go.mod h1:EtekO9DEJb4/jRyN4v4Qjc2yA7AtfCBuz2FynRUWTXs= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 h1:wVZXIWjQSeSmMoxF74LzAnpVQOAFDo3pPji9Y4SOFKc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0/go.mod h1:khvBS2IggMFNwZK/6lEeHg/W57h/IX6J4URh57fuI40= -go.opentelemetry.io/otel/exporters/prometheus v0.62.0 h1:krvC4JMfIOVdEuNPTtQ0ZjCiXrybhv+uOHMfHRmnvVo= -go.opentelemetry.io/otel/exporters/prometheus v0.62.0/go.mod h1:fgOE6FM/swEnsVQCqCnbOfRV4tOnWPg7bVeo4izBuhQ= -go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= -go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= -go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= -go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= -go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= -go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= go.opentelemetry.io/otel/sdk/metric v1.42.0/go.mod h1:Ua6AAlDKdZ7tdvaQKfSmnFTdHx37+J4ba8MwVCYM5hc= -go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= -go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= -go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -597,8 +560,6 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/internal/events/event-relay.go b/internal/events/event-relay.go index d7f2506d..c657301a 100644 --- a/internal/events/event-relay.go +++ b/internal/events/event-relay.go @@ -45,6 +45,7 @@ type analyticsEventEndpointDispatcher struct { summarizingRelay *eventSummarizingRelay wrapper *datadestination.DataDestinationWrapper eventQueueCleanupInterval time.Duration + eventMetrics EventMetrics loggers ldlog.Loggers mu sync.Mutex } @@ -170,7 +171,8 @@ func (r *analyticsEventEndpointDispatcher) getVerbatimRelay() *eventVerbatimRela r.mu.Lock() defer r.mu.Unlock() if r.verbatimRelay == nil { - r.verbatimRelay = newEventVerbatimRelay(r.authKey, r.config, r.httpConfig, r.loggers, r.remotePath) + r.verbatimRelay = newEventVerbatimRelay(r.authKey, r.config, r.httpConfig, r.loggers, r.remotePath, + OptionEventMetrics{EventMetrics: r.eventMetrics}) } return r.verbatimRelay } @@ -180,7 +182,7 @@ func (r *analyticsEventEndpointDispatcher) getSummarizingRelay() *eventSummarizi defer r.mu.Unlock() if r.summarizingRelay == nil { r.summarizingRelay = newEventSummarizingRelay(r.config, r.httpConfig, r.authKey, r.wrapper, - r.loggers, r.remotePath, r.eventQueueCleanupInterval) + r.loggers, r.remotePath, r.eventQueueCleanupInterval, r.eventMetrics) } return r.summarizingRelay } @@ -206,11 +208,12 @@ func NewEventDispatcher( httpConfig httpconfig.HTTPConfig, wrapper *datadestination.DataDestinationWrapper, eventQueueCleanupInterval time.Duration, // normally zero to use the default; overridden in tests + eventMetrics EventMetrics, ) *EventDispatcher { ep := &EventDispatcher{ analyticsEndpoints: map[basictypes.SDKKind]*analyticsEventEndpointDispatcher{ basictypes.ServerSDK: newAnalyticsEventEndpointDispatcher(sdkKey, - config, httpConfig, wrapper, loggers, "/bulk", eventQueueCleanupInterval), + config, httpConfig, wrapper, loggers, "/bulk", eventQueueCleanupInterval, eventMetrics), }, diagnosticEndpoints: map[basictypes.SDKKind]*diagnosticEventEndpointDispatcher{ basictypes.ServerSDK: newDiagnosticEventEndpointDispatcher(config, httpConfig, loggers, "/diagnostic"), @@ -218,12 +221,12 @@ func NewEventDispatcher( } if mobileKey.Defined() { ep.analyticsEndpoints[basictypes.MobileSDK] = newAnalyticsEventEndpointDispatcher(mobileKey, - config, httpConfig, wrapper, loggers, "/mobile", eventQueueCleanupInterval) + config, httpConfig, wrapper, loggers, "/mobile", eventQueueCleanupInterval, eventMetrics) ep.diagnosticEndpoints[basictypes.MobileSDK] = newDiagnosticEventEndpointDispatcher(config, httpConfig, loggers, "/mobile/events/diagnostic") } if envID.Defined() { ep.analyticsEndpoints[basictypes.JSClientSDK] = newAnalyticsEventEndpointDispatcher(envID, config, httpConfig, wrapper, loggers, - "/events/bulk/"+string(envID), eventQueueCleanupInterval) + "/events/bulk/"+string(envID), eventQueueCleanupInterval, eventMetrics) ep.diagnosticEndpoints[basictypes.JSClientSDK] = newDiagnosticEventEndpointDispatcher(config, httpConfig, loggers, "/events/diagnostic/"+string(envID)) } @@ -280,6 +283,7 @@ func newAnalyticsEventEndpointDispatcher( loggers ldlog.Loggers, remotePath string, eventQueueCleanupInterval time.Duration, + eventMetrics EventMetrics, ) *analyticsEventEndpointDispatcher { return &analyticsEventEndpointDispatcher{ authKey: authKey, @@ -290,6 +294,7 @@ func newAnalyticsEventEndpointDispatcher( loggers: loggers, remotePath: remotePath, eventQueueCleanupInterval: eventQueueCleanupInterval, + eventMetrics: eventMetrics, } } @@ -299,14 +304,17 @@ func newEventVerbatimRelay( httpConfig httpconfig.HTTPConfig, loggers ldlog.Loggers, remotePath string, + extraOptions ...OptionType, ) *eventVerbatimRelay { eventsURI := getEventsURI(config) - opts := []OptionType{ + opts := make([]OptionType, 0, 4+len(extraOptions)) + opts = append(opts, OptionCapacity(config.Capacity.GetOrElse(c.DefaultEventCapacity)), OptionBaseURI(eventsURI), OptionURIPath(remotePath), OptionFlushInterval(config.FlushInterval.GetOrElse(c.DefaultEventsFlushInterval)), - } + ) + opts = append(opts, extraOptions...) publisher, _ := NewHTTPEventPublisher(authKey, httpConfig, loggers, opts...) diff --git a/internal/events/event-relay_test.go b/internal/events/event-relay_test.go index 5a6687f9..a1e2ded3 100644 --- a/internal/events/event-relay_test.go +++ b/internal/events/event-relay_test.go @@ -122,6 +122,7 @@ func eventRelayTestWithOptions( httpConfig, wrapper, opts.eventQueueCleanupInterval, + nil, ) defer dispatcher.Close() diff --git a/internal/events/event_metrics.go b/internal/events/event_metrics.go new file mode 100644 index 00000000..a9fcf28f --- /dev/null +++ b/internal/events/event_metrics.go @@ -0,0 +1,50 @@ +package events + +import ( + ldevents "github.com/launchdarkly/go-sdk-events/v3" +) + +// EventSendFailureMetadata is an alias for ldevents.EventSendFailureMetadata so that the +// ld-relay EventMetrics interface is structurally identical to the go-sdk-events one. +// This allows a single concrete type to satisfy both interfaces. +type EventSendFailureMetadata = ldevents.EventSendFailureMetadata + +// EventMetrics defines an interface for receiving metrics about event processing. This is used by +// the metrics package to record telemetry (e.g. via OpenTelemetry) about events that are dropped, +// sent, or otherwise processed by the relay's event forwarding pipeline. +// +// This interface mirrors the one in go-sdk-events. The metrics package provides a single concrete +// implementation that satisfies both via Go structural typing. +type EventMetrics interface { + // RecordDroppedEvents is called when events are discarded because the event buffer has reached + // its configured capacity. The count parameter indicates how many events were dropped. + RecordDroppedEvents(count int) + + // RecordEventsSent is called when a batch of events has been successfully delivered to the + // events service. The count parameter indicates how many events were in the batch. + RecordEventsSent(count int) + + // RecordEventsFailedSend is called when a batch of events could not be delivered to the + // events service after all retry attempts. The count parameter indicates how many events + // were in the failed batch. The metadata parameter provides additional context about the failure. + RecordEventsFailedSend(count int, metadata EventSendFailureMetadata) + + // RecordEventsBytesSent is called when a batch of events has been successfully delivered. + // The bytes parameter is the size of the serialized event payload before compression. + RecordEventsBytesSent(bytes int) + + // RecordPendingEvents is called after any operation that changes the number of events + // buffered awaiting delivery. The count parameter is the current total number of events pending. + RecordPendingEvents(count int) +} + +// NoOpEventMetrics is a default implementation of EventMetrics that does nothing. +// It is used when no EventMetrics is provided, eliminating the need for nil checks +// at every call site. +type NoOpEventMetrics struct{} + +func (NoOpEventMetrics) RecordDroppedEvents(int) {} +func (NoOpEventMetrics) RecordEventsSent(int) {} +func (NoOpEventMetrics) RecordEventsFailedSend(int, EventSendFailureMetadata) {} +func (NoOpEventMetrics) RecordEventsBytesSent(int) {} +func (NoOpEventMetrics) RecordPendingEvents(int) {} diff --git a/internal/events/event_publisher.go b/internal/events/event_publisher.go index fd373ced..833f40d6 100644 --- a/internal/events/event_publisher.go +++ b/internal/events/event_publisher.go @@ -103,10 +103,11 @@ type HTTPEventPublisher struct { disableQueue chan interface{} disabled bool - queues map[EventPayloadMetadata]*publisherQueue - capacity int - overflowed bool - lock sync.RWMutex + queues map[EventPayloadMetadata]*publisherQueue + capacity int + overflowed bool + eventMetrics EventMetrics + lock sync.RWMutex } type eventBatch struct { @@ -159,6 +160,17 @@ func (o OptionCapacity) apply(p *HTTPEventPublisher) error { return nil } +// OptionEventMetrics provides an EventMetrics implementation for recording metrics about event +// processing such as dropped event counts. +type OptionEventMetrics struct { + EventMetrics EventMetrics +} + +func (o OptionEventMetrics) apply(p *HTTPEventPublisher) error { + p.eventMetrics = o.EventMetrics + return nil +} + // NewHTTPEventPublisher creates a new HTTPEventPublisher. func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconfig.HTTPConfig, loggers ldlog.Loggers, options ...OptionType) (*HTTPEventPublisher, error) { closer := make(chan struct{}) @@ -197,6 +209,9 @@ func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconf } } + if p.eventMetrics == nil { + p.eventMetrics = NoOpEventMetrics{} + } p.queues = make(map[EventPayloadMetadata]*publisherQueue) p.wg.Add(1) @@ -217,6 +232,7 @@ func NewHTTPEventPublisher(authKey credential.SDKCredential, httpConfig httpconf // Ensure we free up as much memory as we can by clearing any pending events p.queues = make(map[EventPayloadMetadata]*publisherQueue) p.disabled = true + p.eventMetrics.RecordPendingEvents(0) case e := <-inputQueue: if p.disabled { continue @@ -257,10 +273,22 @@ func (p *HTTPEventPublisher) append(batch eventBatch) { p.overflowed = true } taken = available + if dropped := len(batch.events) - taken; dropped > 0 { + p.eventMetrics.RecordDroppedEvents(dropped) + } } else { p.overflowed = false } queue.events = append(queue.events, batch.events[:taken]...) + p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) +} + +func (p *HTTPEventPublisher) totalPendingEvents() int { + total := 0 + for _, q := range p.queues { + total += len(q.events) + } + return total } func (p *HTTPEventPublisher) ReplaceCredential(newCredential credential.SDKCredential) { //nolint:revive // method is already documented in interface @@ -338,6 +366,7 @@ func (p *HTTPEventPublisher) flush() { return ret } + eventMetrics := p.eventMetrics go func() { // The EventSender created by ldevents.NewDefaultEventSender implements the standard retry behavior, // and error logging, in its SendEventData method. Retries could cause this call to block for a while, @@ -351,12 +380,21 @@ func (p *HTTPEventPublisher) flush() { EnableCompression: true, } result := ldevents.SendEventDataWithRetry(sendConfig, ldevents.AnalyticsEventDataKind, p.uriPath, payload, count) + if result.Success { + eventMetrics.RecordEventsSent(count) + eventMetrics.RecordEventsBytesSent(len(payload)) + } else { + eventMetrics.RecordEventsFailedSend(count, EventSendFailureMetadata{ + StatusCode: result.StatusCode, + }) + } p.wg.Done() if result.MustShutDown { p.disableQueue <- struct{}{} } }() } + p.eventMetrics.RecordPendingEvents(p.totalPendingEvents()) } func (p *HTTPEventPublisher) Close() { //nolint:revive // method is already documented in interface diff --git a/internal/events/event_publisher_test.go b/internal/events/event_publisher_test.go index c44e9ba2..308561b3 100644 --- a/internal/events/event_publisher_test.go +++ b/internal/events/event_publisher_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "sort" "strconv" + "sync" "testing" "time" @@ -189,6 +190,195 @@ func TestHTTPEventPublisherCapacity(t *testing.T) { }) } +type mockEventMetrics struct { + mu sync.Mutex + droppedCount int + sentCount int + failedSendCount int + lastFailedSendMeta EventSendFailureMetadata + bytesSent int + lastPendingEvents int +} + +func (m *mockEventMetrics) RecordDroppedEvents(count int) { + m.mu.Lock() + defer m.mu.Unlock() + m.droppedCount += count +} + +func (m *mockEventMetrics) RecordEventsSent(count int) { + m.mu.Lock() + defer m.mu.Unlock() + m.sentCount += count +} + +func (m *mockEventMetrics) RecordEventsFailedSend(count int, metadata EventSendFailureMetadata) { + m.mu.Lock() + defer m.mu.Unlock() + m.failedSendCount += count + m.lastFailedSendMeta = metadata +} + +func (m *mockEventMetrics) RecordEventsBytesSent(bytes int) { + m.mu.Lock() + defer m.mu.Unlock() + m.bytesSent += bytes +} + +func (m *mockEventMetrics) RecordPendingEvents(depth int) { + m.mu.Lock() + defer m.mu.Unlock() + m.lastPendingEvents = depth +} + +func (m *mockEventMetrics) getDroppedCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.droppedCount +} + +func (m *mockEventMetrics) getSentCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.sentCount +} + +func (m *mockEventMetrics) getFailedSendCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.failedSendCount +} + +func (m *mockEventMetrics) getLastFailedSendMeta() EventSendFailureMetadata { + m.mu.Lock() + defer m.mu.Unlock() + return m.lastFailedSendMeta +} + +func (m *mockEventMetrics) getBytesSent() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.bytesSent +} + +func (m *mockEventMetrics) getLastPendingEvents() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.lastPendingEvents +} + +func TestHTTPEventPublisherDroppedEventsMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionCapacity(2), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + // Publish 5 events with capacity 2 — should drop 3 + publisher.Publish(EventPayloadMetadata{}, + json.RawMessage(`"a"`), json.RawMessage(`"b"`), + json.RawMessage(`"c"`), json.RawMessage(`"d"`), json.RawMessage(`"e"`)) + publisher.Flush() + + r := helpers.RequireValue(t, requestsCh, time.Second) + uncompressed, err := util.DecompressGzipData(r.Body) + assert.NoError(t, err) + m.In(t).Assert(uncompressed, m.JSONStrEqual(`["a","b"]`)) + + assert.Equal(t, 3, metrics.getDroppedCount()) + }) +} + +func TestHTTPEventPublisherEventsSentMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"a"`), json.RawMessage(`"b"`), json.RawMessage(`"c"`)) + publisher.Flush() + + _ = helpers.RequireValue(t, requestsCh, time.Second) + // Wait for the goroutine to record the metric after the send completes + assert.Eventually(t, func() bool { return metrics.getSentCount() == 3 }, time.Second, 10*time.Millisecond) + assert.Greater(t, metrics.getBytesSent(), 0) + }) +} + +func TestHTTPEventPublisherPendingEventsMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + // Publish 3 events — queue depth should be 3 after processing + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"a"`), json.RawMessage(`"b"`), json.RawMessage(`"c"`)) + // Flush will clear the queue — depth should go to 0 + publisher.Flush() + + _ = helpers.RequireValue(t, requestsCh, time.Second) + assert.Eventually(t, func() bool { return metrics.getLastPendingEvents() == 0 }, time.Second, 10*time.Millisecond) + }) +} + +func TestHTTPEventPublisherEventsFailedSendMetric(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + // Return 500 twice (exhausts retries) so the send fails + handler, requestsCh := httphelpers.RecordingHandler( + httphelpers.SequentialHandler( + httphelpers.HandlerWithStatus(503), + httphelpers.HandlerWithStatus(503), + ), + ) + httphelpers.WithServer(handler, func(server *httptest.Server) { + metrics := &mockEventMetrics{} + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionEventMetrics{EventMetrics: metrics}) + defer publisher.Close() + + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"a"`), json.RawMessage(`"b"`)) + publisher.Flush() + + // Wait for both retry attempts + _ = helpers.RequireValue(t, requestsCh, 5*time.Second) + _ = helpers.RequireValue(t, requestsCh, 5*time.Second) + + assert.Eventually(t, func() bool { return metrics.getFailedSendCount() == 2 }, 5*time.Second, 10*time.Millisecond) + assert.Equal(t, 0, metrics.getSentCount()) + assert.Equal(t, 503, metrics.getLastFailedSendMeta().StatusCode) + }) +} + +func TestHTTPEventPublisherDroppedEventsMetricNilIsNoOp(t *testing.T) { + mockLog := ldlogtest.NewMockLog() + defer mockLog.DumpIfTestFailed(t) + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(202)) + httphelpers.WithServer(handler, func(server *httptest.Server) { + // No EventMetrics option — should not panic + publisher, _ := NewHTTPEventPublisher(config.SDKKey("my-key"), defaultHTTPConfig(), mockLog.Loggers, + OptionBaseURI(server.URL), OptionCapacity(1)) + defer publisher.Close() + publisher.Publish(EventPayloadMetadata{}, json.RawMessage(`"hello"`), json.RawMessage(`"goodbye"`)) + publisher.Flush() + r := helpers.RequireValue(t, requestsCh, time.Second) + uncompressed, err := util.DecompressGzipData(r.Body) + assert.NoError(t, err) + m.In(t).Assert(uncompressed, m.JSONStrEqual(`["hello"]`)) + }) +} + func TestHTTPEventPublisherErrorRetry(t *testing.T) { testRecoverableError := func(t *testing.T, errorHandler http.Handler) { mockLog := ldlogtest.NewMockLog() diff --git a/internal/events/summarizing-relay.go b/internal/events/summarizing-relay.go index 58d89bc6..055dc473 100644 --- a/internal/events/summarizing-relay.go +++ b/internal/events/summarizing-relay.go @@ -65,6 +65,7 @@ func newEventSummarizingRelay( loggers ldlog.Loggers, remotePath string, eventQueueCleanupInterval time.Duration, + eventMetrics EventMetrics, ) *eventSummarizingRelay { eventsConfig := ldevents.EventsConfiguration{ Capacity: config.Capacity.GetOrElse(c.DefaultEventCapacity), @@ -72,6 +73,7 @@ func newEventSummarizingRelay( Loggers: loggers, UserKeysCapacity: ldcomponents.DefaultContextKeysCapacity, UserKeysFlushInterval: ldcomponents.DefaultContextKeysFlushInterval, + EventMetrics: eventMetrics, } baseHeaders := make(http.Header) for k, v := range httpConfig.SDKHTTPConfig.DefaultHeaders { diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go index 36c5f6bc..c8fa245f 100644 --- a/internal/metrics/constants.go +++ b/internal/metrics/constants.go @@ -12,7 +12,12 @@ const ( connMeasureName = "launchdarkly.relay.connections" requestMeasureName = "launchdarkly.relay.requests" requestDurationMeasureName = "launchdarkly.relay.request.duration" - eventsIngestedBytesMeasureName = "launchdarkly.relay.events.ingested.bytes" + eventsReceivedBytesMeasureName = "launchdarkly.relay.events.received.bytes" + eventsSentCountMeasureName = "launchdarkly.relay.events.sent.count" + eventsSentBytesMeasureName = "launchdarkly.relay.events.sent.bytes" + eventsSentFailuresMeasureName = "launchdarkly.relay.events.sent.failures" + eventsSentDroppedMeasureName = "launchdarkly.relay.events.sent.dropped" + eventsSentPendingMeasureName = "launchdarkly.relay.events.sent.pending" defaultFlushInterval = time.Minute @@ -32,6 +37,7 @@ var ( applicationIDAttrKey = attribute.Key("application.id") //nolint:gochecknoglobals applicationVersionAttrKey = attribute.Key("application.version") //nolint:gochecknoglobals instanceIDAttrKey = attribute.Key("instanceId") //nolint:gochecknoglobals + statusCodeAttrKey = attribute.Key("statusCode") //nolint:gochecknoglobals ) // buildAttributes creates an OTel attribute set from base key-values plus per-request attributes. diff --git a/internal/metrics/measures.go b/internal/metrics/measures.go index dd8f3b8a..243e6507 100644 --- a/internal/metrics/measures.go +++ b/internal/metrics/measures.go @@ -2,8 +2,11 @@ package metrics import ( "context" + "strconv" "time" + ldevents "github.com/launchdarkly/go-sdk-events/v3" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) @@ -12,7 +15,12 @@ type Instruments struct { connections metric.Int64UpDownCounter // active connections (+1/-1) requests metric.Int64Counter // cumulative HTTP requests requestDuration metric.Float64Histogram // request duration in seconds - eventsIngestedBytes metric.Int64Counter // cumulative bytes of event data ingested + eventsReceivedBytes metric.Int64Counter // cumulative bytes of event data received + eventsDropped metric.Int64Counter // cumulative count of events dropped due to capacity overflow + eventsSent metric.Int64Counter // cumulative count of events successfully sent + eventsFailedSend metric.Int64Counter // cumulative count of events that failed to send + eventsBytesSent metric.Int64Counter // cumulative bytes of event payloads successfully sent + pendingEvents metric.Int64Gauge // current number of events pending delivery } // Measure identifies what to record. Each pre-defined Measure var specifies which @@ -70,7 +78,27 @@ func NewInstrumentsForTest(meter metric.Meter) (*Instruments, error) { if err != nil { return nil, err } - eventsIngestedBytes, err := meter.Int64Counter(eventsIngestedBytesMeasureName) + eventsReceivedBytes, err := meter.Int64Counter(eventsReceivedBytesMeasureName) + if err != nil { + return nil, err + } + eventsDropped, err := meter.Int64Counter(eventsSentDroppedMeasureName) + if err != nil { + return nil, err + } + eventsSent, err := meter.Int64Counter(eventsSentCountMeasureName) + if err != nil { + return nil, err + } + eventsFailedSend, err := meter.Int64Counter(eventsSentFailuresMeasureName) + if err != nil { + return nil, err + } + eventsBytesSent, err := meter.Int64Counter(eventsSentBytesMeasureName) + if err != nil { + return nil, err + } + pendingEvents, err := meter.Int64Gauge(eventsSentPendingMeasureName) if err != nil { return nil, err } @@ -78,7 +106,12 @@ func NewInstrumentsForTest(meter metric.Meter) (*Instruments, error) { connections: connections, requests: requests, requestDuration: requestDuration, - eventsIngestedBytes: eventsIngestedBytes, + eventsReceivedBytes: eventsReceivedBytes, + eventsDropped: eventsDropped, + eventsSent: eventsSent, + eventsFailedSend: eventsFailedSend, + eventsBytesSent: eventsBytesSent, + pendingEvents: pendingEvents, }, nil } @@ -158,14 +191,14 @@ func WithRouteCount(ctx context.Context, em *EnvironmentManager, instruments *In f() } -// RecordEventsIngestedBytes records the number of event bytes ingested. -func RecordEventsIngestedBytes(ctx context.Context, instruments *Instruments, em *EnvironmentManager, platformCategory string, ri RequestInfo, bytes int64) { +// RecordEventsReceivedBytes records the number of event bytes received. +func RecordEventsReceivedBytes(ctx context.Context, instruments *Instruments, em *EnvironmentManager, platformCategory string, ri RequestInfo, bytes int64) { if em == nil || instruments == nil || bytes <= 0 { return } ua, wrapper, route, method, appID, appVersion, instanceID := ri.sanitized() attrs := buildRequestAttributes(em.envKVs, platformCategory, ua, wrapper, route, method, appID, appVersion, instanceID) - instruments.eventsIngestedBytes.Add(ctx, bytes, metric.WithAttributeSet(attrs)) + instruments.eventsReceivedBytes.Add(ctx, bytes, metric.WithAttributeSet(attrs)) } // RecordRequestDuration records a request duration measurement with the given attributes. @@ -177,3 +210,58 @@ func RecordRequestDuration(ctx context.Context, instruments *Instruments, em *En attrs := buildRequestAttributes(em.envKVs, measure.platformCategory, ua, wrapper, route, method, appID, appVersion, instanceID) instruments.requestDuration.Record(ctx, duration.Seconds(), metric.WithAttributeSet(attrs)) } + +// EventMetricsRecorder implements the EventMetrics interface defined in both the events package +// and go-sdk-events, recording event processing metrics via OTEL instruments. It uses +// environment-level attributes only (relayId, env) since event drops occur asynchronously, +// detached from any specific HTTP request context. +type EventMetricsRecorder struct { + instruments *Instruments + envKVs []attribute.KeyValue // private copy, safe for concurrent read + envAttrs attribute.Set // pre-computed to avoid concurrent sort in attribute.NewSet +} + +// RecordDroppedEvents records the number of events dropped due to capacity overflow. +func (r *EventMetricsRecorder) RecordDroppedEvents(count int) { + if r.instruments == nil || count <= 0 { + return + } + r.instruments.eventsDropped.Add(context.Background(), int64(count), metric.WithAttributeSet(r.envAttrs)) +} + +// RecordEventsSent records the number of events successfully delivered to the events service. +func (r *EventMetricsRecorder) RecordEventsSent(count int) { + if r.instruments == nil || count <= 0 { + return + } + r.instruments.eventsSent.Add(context.Background(), int64(count), metric.WithAttributeSet(r.envAttrs)) +} + +// RecordPendingEvents records the current number of events pending delivery. +func (r *EventMetricsRecorder) RecordPendingEvents(depth int) { + if r.instruments == nil { + return + } + r.instruments.pendingEvents.Record(context.Background(), int64(depth), metric.WithAttributeSet(r.envAttrs)) +} + +// RecordEventsBytesSent records the size of event payloads successfully delivered. +func (r *EventMetricsRecorder) RecordEventsBytesSent(bytes int) { + if r.instruments == nil || bytes <= 0 { + return + } + r.instruments.eventsBytesSent.Add(context.Background(), int64(bytes), metric.WithAttributeSet(r.envAttrs)) +} + +// RecordEventsFailedSend records the number of events that could not be delivered after all retries. +// The status code from the metadata is included as an attribute on the metric. +func (r *EventMetricsRecorder) RecordEventsFailedSend(count int, metadata ldevents.EventSendFailureMetadata) { + if r.instruments == nil || count <= 0 { + return + } + kvs := make([]attribute.KeyValue, len(r.envKVs), len(r.envKVs)+1) + copy(kvs, r.envKVs) + kvs = append(kvs, statusCodeAttrKey.String(strconv.Itoa(metadata.StatusCode))) + attrs := attribute.NewSet(kvs...) + r.instruments.eventsFailedSend.Add(context.Background(), int64(count), metric.WithAttributeSet(attrs)) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8eb31d41..572be73c 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -102,15 +102,32 @@ func NewManager( requestDuration, _ := meter.Float64Histogram(requestDurationMeasureName, otelmetric.WithDescription("request duration in seconds"), otelmetric.WithUnit("s")) - eventsIngestedBytes, _ := meter.Int64Counter(eventsIngestedBytesMeasureName, - otelmetric.WithDescription("cumulative bytes of event data ingested"), + eventsReceivedBytes, _ := meter.Int64Counter(eventsReceivedBytesMeasureName, + otelmetric.WithDescription("cumulative bytes of event data received"), otelmetric.WithUnit("By")) + eventsDropped, _ := meter.Int64Counter(eventsSentDroppedMeasureName, + otelmetric.WithDescription("cumulative count of events dropped due to capacity overflow")) + eventsSent, _ := meter.Int64Counter(eventsSentCountMeasureName, + otelmetric.WithDescription("cumulative count of events successfully sent")) + eventsFailedSend, _ := meter.Int64Counter(eventsSentFailuresMeasureName, + otelmetric.WithDescription("cumulative count of events that failed to send after all retries")) + eventsBytesSent, _ := meter.Int64Counter(eventsSentBytesMeasureName, + otelmetric.WithDescription("cumulative bytes of event payloads successfully sent"), + otelmetric.WithUnit("By")) + pendingEvents, _ := meter.Int64Gauge(eventsSentPendingMeasureName, + otelmetric.WithDescription("current number of events buffered in the queue")) + instruments := &Instruments{ connections: connections, requests: requests, requestDuration: requestDuration, - eventsIngestedBytes: eventsIngestedBytes, + eventsReceivedBytes: eventsReceivedBytes, + eventsDropped: eventsDropped, + eventsSent: eventsSent, + eventsFailedSend: eventsFailedSend, + eventsBytesSent: eventsBytesSent, + pendingEvents: pendingEvents, } usageChan := make(chan any, 256) @@ -274,6 +291,22 @@ func (em *EnvironmentManager) GetAttributes() attribute.Set { return attribute.NewSet(em.envKVs...) } +// NewEventMetricsRecorder creates an EventMetricsRecorder that records event processing metrics +// with this environment's attributes. The returned recorder satisfies the EventMetrics interfaces +// defined in both the events package and go-sdk-events. +// +// The recorder makes a private copy of the environment attributes to avoid data races with +// attribute.NewSet's in-place sort. +func (em *EnvironmentManager) NewEventMetricsRecorder(instruments *Instruments) *EventMetricsRecorder { + envKVsCopy := make([]attribute.KeyValue, len(em.envKVs)) + copy(envKVsCopy, em.envKVs) + return &EventMetricsRecorder{ + instruments: instruments, + envKVs: envKVsCopy, + envAttrs: attribute.NewSet(envKVsCopy...), + } +} + // FlushEventsExporter is used in testing to trigger the collector to post data to the event publisher. func (em *EnvironmentManager) FlushEventsExporter() { if em.collector != nil { diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index e45f83d1..3d4954f9 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -33,7 +33,7 @@ func TestNewManagerReturnsInstruments(t *testing.T) { assert.NotNil(t, instruments.connections) assert.NotNil(t, instruments.requests) assert.NotNil(t, instruments.requestDuration) - assert.NotNil(t, instruments.eventsIngestedBytes) + assert.NotNil(t, instruments.eventsReceivedBytes) } func TestAddEnvironmentWithoutEventPublisher(t *testing.T) { @@ -177,14 +177,14 @@ func TestWithRouteCount(t *testing.T) { }) } -func TestRecordEventsIngestedBytes(t *testing.T) { +func TestRecordEventsReceivedBytes(t *testing.T) { testWithOTel(t, func(p testWithOTelParams) { - RecordEventsIngestedBytes(context.Background(), p.instruments, p.env, ServerPlatformCategory, RequestInfo{UserAgent: userAgentValue, Route: "/bulk", Method: "POST"}, 1024) + RecordEventsReceivedBytes(context.Background(), p.instruments, p.env, ServerPlatformCategory, RequestInfo{UserAgent: userAgentValue, Route: "/bulk", Method: "POST"}, 1024) rm, err := p.collectMetrics() require.NoError(t, err) - m := findMetric(rm, eventsIngestedBytesMeasureName) - require.NotNil(t, m, "events ingested bytes metric not found") + m := findMetric(rm, eventsReceivedBytesMeasureName) + require.NotNil(t, m, "events received bytes metric not found") sum, ok := m.Data.(metricdata.Sum[int64]) require.True(t, ok, "expected Sum[int64] data") require.NotEmpty(t, sum.DataPoints) @@ -197,7 +197,33 @@ func TestRecordEventsIngestedBytes(t *testing.T) { found = true } } - assert.True(t, found, "expected data point for events ingested bytes") + assert.True(t, found, "expected data point for events received bytes") + }) +} + +func TestEventMetricsRecorderViaTestHelper(t *testing.T) { + testWithOTel(t, func(p testWithOTelParams) { + recorder := p.env.NewEventMetricsRecorder(p.instruments) + + recorder.RecordDroppedEvents(5) + recorder.RecordEventsSent(10) + recorder.RecordEventsBytesSent(2048) + recorder.RecordPendingEvents(3) + + rm, err := p.collectMetrics() + require.NoError(t, err) + + droppedMetric := findMetric(rm, eventsSentDroppedMeasureName) + require.NotNil(t, droppedMetric, "events dropped metric not found") + + sentMetric := findMetric(rm, eventsSentCountMeasureName) + require.NotNil(t, sentMetric, "events sent metric not found") + + bytesMetric := findMetric(rm, eventsSentBytesMeasureName) + require.NotNil(t, bytesMetric, "events bytes sent metric not found") + + pendingMetric := findMetric(rm, eventsSentPendingMeasureName) + require.NotNil(t, pendingMetric, "events pending metric not found") }) } diff --git a/internal/metrics/test_utils_test.go b/internal/metrics/test_utils_test.go index 066d839a..af62dbb3 100644 --- a/internal/metrics/test_utils_test.go +++ b/internal/metrics/test_utils_test.go @@ -52,17 +52,8 @@ func testWithOTel(t *testing.T, action func(testWithOTelParams)) { meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) meter := meterProvider.Meter("ld-relay") - connections, _ := meter.Int64UpDownCounter(connMeasureName) - requests, _ := meter.Int64Counter(requestMeasureName) - requestDuration, _ := meter.Float64Histogram(requestDurationMeasureName) - eventsIngestedBytes, _ := meter.Int64Counter(eventsIngestedBytesMeasureName) - - instruments := &Instruments{ - connections: connections, - requests: requests, - requestDuration: requestDuration, - eventsIngestedBytes: eventsIngestedBytes, - } + instruments, err := NewInstrumentsForTest(meter) + require.NoError(t, err) manager, err := NewManager(config.OpenTelemetryConfig{}, time.Millisecond*10, mockLog.Loggers) require.NoError(t, err) diff --git a/internal/middleware/metrics_middleware.go b/internal/middleware/metrics_middleware.go index f6b42e0f..602fdd59 100644 --- a/internal/middleware/metrics_middleware.go +++ b/internal/middleware/metrics_middleware.go @@ -168,7 +168,7 @@ func RequestMetrics(measure metrics.Measure) mux.MiddlewareFunc { } } -// EventBytesMetrics is a middleware function that records the number of event bytes ingested. +// EventBytesMetrics is a middleware function that records the number of event bytes received. // This should be applied after GzipMiddleware so that it measures decompressed bytes. func EventBytesMetrics(platformCategory string) mux.MiddlewareFunc { return func(next http.Handler) http.Handler { @@ -182,7 +182,7 @@ func EventBytesMetrics(platformCategory string) mux.MiddlewareFunc { next.ServeHTTP(w, req) env := GetEnvContextInfo(req.Context()).Env ri := requestInfoFromHTTP(req) - metrics.RecordEventsIngestedBytes(req.Context(), getInstruments(env), env.GetMetricsEnv(), platformCategory, ri, cr.bytesRead.Load()) + metrics.RecordEventsReceivedBytes(req.Context(), getInstruments(env), env.GetMetricsEnv(), platformCategory, ri, cr.bytesRead.Load()) }) } } diff --git a/internal/middleware/metrics_middleware_test.go b/internal/middleware/metrics_middleware_test.go index 06f61d6e..7696044c 100644 --- a/internal/middleware/metrics_middleware_test.go +++ b/internal/middleware/metrics_middleware_test.go @@ -198,8 +198,8 @@ func TestEventBytesMetrics(t *testing.T) { router.ServeHTTP(httptest.NewRecorder(), req) rm := p.collectMetrics(t) - bytesMetric := st.FindMetricByName(rm, "launchdarkly.relay.events.ingested.bytes") - require.NotNil(t, bytesMetric, "events ingested bytes metric not found") + bytesMetric := st.FindMetricByName(rm, "launchdarkly.relay.events.received.bytes") + require.NotNil(t, bytesMetric, "events received bytes metric not found") assertMetricHasValue(t, bytesMetric, p.envName, "server", 35) }) } diff --git a/internal/relayenv/env_context_impl.go b/internal/relayenv/env_context_impl.go index 0ea92080..1dfa46c8 100644 --- a/internal/relayenv/env_context_impl.go +++ b/internal/relayenv/env_context_impl.go @@ -286,28 +286,6 @@ func NewEnvContext( wrapper := datadestination.NewDataDesinationWrapper(envStreamUpdates) envContext.wrapper = wrapper - var eventDispatcher *events.EventDispatcher - if allConfig.Events.SendEvents { - if offlineMode { - envLoggers.Info("Events will be accepted for this environment, but will be discarded, since offline mode is enabled") - } else { - envLoggers.Info("Proxying events for this environment") - eventLoggers := envLoggers - eventLoggers.SetPrefix(logPrefix + " (event proxy)") - eventDispatcher = events.NewEventDispatcher( - envConfig.SDKKey, - envConfig.MobileKey, - envConfig.EnvID, - envLoggers, - allConfig.Events, - httpConfig, - wrapper, - 0, // 0 here means "use the default interval for any periodic cleanup task you may need to run" - ) - } - } - envContext.eventDispatcher = eventDispatcher - streamURI := allConfig.Main.StreamURI.String() // config.ValidateConfig has ensured that this has a value baseURI := allConfig.Main.BaseURI.String() eventsURI := allConfig.Events.EventsURI.String() // ditto @@ -343,6 +321,37 @@ func NewEnvContext( envContext.metricsEnv = em + // Create an EventMetrics recorder for the event dispatchers to use when reporting + // internal metrics like dropped events. This must be done after the EnvironmentManager + // is created so we have access to the environment-level OTEL attributes. + var eventMetrics events.EventMetrics + if em != nil { + eventMetrics = em.NewEventMetricsRecorder(params.MetricsManager.GetInstruments()) + } + + var eventDispatcher *events.EventDispatcher + if allConfig.Events.SendEvents { + if offlineMode { + envLoggers.Info("Events will be accepted for this environment, but will be discarded, since offline mode is enabled") + } else { + envLoggers.Info("Proxying events for this environment") + eventLoggers := envLoggers + eventLoggers.SetPrefix(logPrefix + " (event proxy)") + eventDispatcher = events.NewEventDispatcher( + envConfig.SDKKey, + envConfig.MobileKey, + envConfig.EnvID, + envLoggers, + allConfig.Events, + httpConfig, + wrapper, + 0, // 0 here means "use the default interval for any periodic cleanup task you may need to run" + eventMetrics, + ) + } + } + envContext.eventDispatcher = eventDispatcher + disconnectedStatusTime := allConfig.Main.DisconnectedStatusTime.GetOrElse(config.DefaultDisconnectedStatusTime) streamingBuilder := ldcomponents.StreamingDataSourceV2().BaseURI(streamURI)