diff --git a/go.mod b/go.mod index 7b580af..7c19034 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/c12s/magnetar -go 1.22.3 +go 1.24.0 require ( github.com/c12s/agent_queue v0.0.0-00010101000000-000000000000 @@ -12,29 +12,43 @@ require ( github.com/juliangruber/go-intersect v1.1.0 github.com/nats-io/nats.go v1.31.0 go.etcd.io/etcd/client/v3 v3.5.9 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.66.0 + go.opentelemetry.io/otel v1.41.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.41.0 + go.opentelemetry.io/otel/sdk v1.41.0 golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b - google.golang.org/grpc v1.65.0 - google.golang.org/protobuf v1.34.1 + google.golang.org/grpc v1.79.1 + google.golang.org/protobuf v1.36.11 ) require ( + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/klauspost/compress v1.17.0 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.41.0 // indirect + go.opentelemetry.io/otel/metric v1.41.0 // indirect + go.opentelemetry.io/otel/trace v1.41.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.34.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect ) replace github.com/c12s/oort => ../oort diff --git a/go.sum b/go.sum index 6fd44d1..843bc59 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= @@ -7,6 +11,11 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -14,21 +23,25 @@ github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17w github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= github.com/juliangruber/go-intersect v1.1.0 h1:sc+y5dCjMMx0pAdYk/N6KBm00tD/f3tq+Iox7dYDUrY= github.com/juliangruber/go-intersect v1.1.0/go.mod h1:WMau+1kAmnlQnKiikekNJbtGtfmILU/mMU6H7AgKbWQ= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= @@ -39,11 +52,13 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/etcd/api/v3 v3.5.9 h1:4wSsluwyTbGGmyjJktOf3wFQoTBIURXHnq9n/G/JQHs= @@ -52,8 +67,30 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.9 h1:oidDC4+YEuSIQbsR94rY9gur91UPL6DnxDCIYd2I go.etcd.io/etcd/client/pkg/v3 v3.5.9/go.mod h1:y+CzeSmkMpWN2Jyu1npecjB9BBnABxGM4pN8cGuJeL4= go.etcd.io/etcd/client/v3 v3.5.9 h1:r5xghnU7CwbUxD/fbUtRyJGaYNfDun8sp/gTr1hew6E= go.etcd.io/etcd/client/v3 v3.5.9/go.mod h1:i/Eo5LrZ5IKqpbtpPDuaUnDOUv471oDg8cjQaUr2MbA= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.66.0 h1:w/o339tDd6Qtu3+ytwt+/jon2yjAs3Ot8Xq8pelfhSo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.66.0/go.mod h1:pdhNtM9C4H5fRdrnwO7NjxzQWhKSSxCHk/KluVqDVC0= +go.opentelemetry.io/otel v1.41.0 h1:YlEwVsGAlCvczDILpUXpIpPSL/VPugt7zHThEMLce1c= +go.opentelemetry.io/otel v1.41.0/go.mod h1:Yt4UwgEKeT05QbLwbyHXEwhnjxNO6D8L5PQP51/46dE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.41.0 h1:ao6Oe+wSebTlQ1OEht7jlYTzQKE+pnx/iNywFvTbuuI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.41.0/go.mod h1:u3T6vz0gh/NVzgDgiwkgLxpsSF6PaPmo2il0apGJbls= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.41.0 h1:mq/Qcf28TWz719lE3/hMB4KkyDuLJIvgJnFGcd0kEUI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.41.0/go.mod h1:yk5LXEYhsL2htyDNJbEq7fWzNEigeEdV5xBF/Y+kAv0= +go.opentelemetry.io/otel/metric v1.41.0 h1:rFnDcs4gRzBcsO9tS8LCpgR0dxg4aaxWlJxCno7JlTQ= +go.opentelemetry.io/otel/metric v1.41.0/go.mod h1:xPvCwd9pU0VN8tPZYzDZV/BMj9CM9vs00GuBjeKhJps= +go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisxBp8= +go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90= +go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3TriaMlf08rXw8= +go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y= +go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= +go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= @@ -61,8 +98,8 @@ go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -71,20 +108,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -93,14 +130,16 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= -google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= -google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0= +google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= +google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/internal/configs/config.go b/internal/configs/config.go index 949ff02..9997a14 100644 --- a/internal/configs/config.go +++ b/internal/configs/config.go @@ -12,6 +12,8 @@ type Config struct { meridianAddress string gravityAddress string tokenKey string + jaegerHost string + jaegerGRPCPort string } func (c *Config) NatsAddress() string { @@ -42,6 +44,17 @@ func (c *Config) TokenKey() string { return c.tokenKey } +func (c *Config) JaegerHost() string { + return c.jaegerHost +} + +func (c *Config) JaegerGRPCEndpoint() string { + if c.jaegerHost == "" || c.jaegerGRPCPort == "" { + return "" + } + return c.jaegerHost + ":" + c.jaegerGRPCPort +} + func NewFromEnv() (*Config, error) { return &Config{ natsAddress: os.Getenv("NATS_ADDRESS"), @@ -51,5 +64,7 @@ func NewFromEnv() (*Config, error) { meridianAddress: os.Getenv("MERIDIAN_ADDRESS"), gravityAddress: os.Getenv("GRAVITY_ADDRESS"), tokenKey: os.Getenv("SECRET_KEY"), + jaegerHost: os.Getenv("JAEGER_HOST"), + jaegerGRPCPort: os.Getenv("JAEGER_GRPC_PORT"), }, nil } diff --git a/internal/domain/node.go b/internal/domain/node.go index f39d982..aebe62f 100644 --- a/internal/domain/node.go +++ b/internal/domain/node.go @@ -1,5 +1,7 @@ package domain +import "context" + type Node struct { Id NodeId Org string @@ -25,16 +27,16 @@ type Selector struct { } type NodeRepo interface { - Put(node Node) error - Get(nodeId NodeId, org string) (*Node, error) - Delete(node Node) error - ListNodePool() ([]Node, error) - ListOrgOwnedNodes(org string) ([]Node, error) - QueryNodePool(query Query) ([]Node, error) - QueryOrgOwnedNodes(query Query, org string) ([]Node, error) - PutLabel(node Node, label Label) (*Node, error) - DeleteLabel(node Node, labelKey string) (*Node, error) - ListAllNodes() ([]Node, error) + Put(ctx context.Context, node Node) error + Get(ctx context.Context, nodeId NodeId, org string) (*Node, error) + Delete(ctx context.Context, node Node) error + ListNodePool(ctx context.Context) ([]Node, error) + ListOrgOwnedNodes(ctx context.Context, org string) ([]Node, error) + ListAllNodes(ctx context.Context) ([]Node, error) + QueryNodePool(ctx context.Context, query Query) ([]Node, error) + QueryOrgOwnedNodes(ctx context.Context, query Query, org string) ([]Node, error) + PutLabel(ctx context.Context, node Node, label Label) (*Node, error) + DeleteLabel(ctx context.Context, node Node, labelKey string) (*Node, error) } type NodeMarshaller interface { diff --git a/internal/repos/node_etcd.go b/internal/repos/node_etcd.go index 78591e5..b3521ca 100644 --- a/internal/repos/node_etcd.go +++ b/internal/repos/node_etcd.go @@ -10,6 +10,7 @@ import ( "github.com/c12s/magnetar/internal/domain" "github.com/juliangruber/go-intersect" etcd "go.etcd.io/etcd/client/v3" + "go.opentelemetry.io/otel" "golang.org/x/exp/slices" ) @@ -27,33 +28,42 @@ type nodeEtcdRepo struct { labelMarshaller domain.LabelMarshaller } -func NewNodeEtcdRepo(etcd *etcd.Client, nodeMarshaller domain.NodeMarshaller, labelMarshaller domain.LabelMarshaller) (domain.NodeRepo, error) { +func NewNodeEtcdRepo(cli *etcd.Client, nodeMarshaller domain.NodeMarshaller, labelMarshaller domain.LabelMarshaller) (domain.NodeRepo, error) { return &nodeEtcdRepo{ - etcd: etcd, + etcd: cli, nodeMarshaller: nodeMarshaller, labelMarshaller: labelMarshaller, }, nil } -func (n nodeEtcdRepo) Put(node domain.Node) error { - err := n.putNodeGetModel(node) +func (n nodeEtcdRepo) Put(ctx context.Context, node domain.Node) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.Put") + defer span.End() + err := n.putNodeGetModel(ctx, node) if err != nil { return err } - return n.putNodeQueryModel(node) + return n.putNodeQueryModel(ctx, node) } -func (n nodeEtcdRepo) Delete(node domain.Node) error { - err := n.deleteNodeGetModel(node) +func (n nodeEtcdRepo) Delete(ctx context.Context, node domain.Node) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.Delete") + defer span.End() + err := n.deleteNodeGetModel(ctx, node) if err != nil { return err } - return n.deleteNodeQueryModel(node) + return n.deleteNodeQueryModel(ctx, node) } -func (n nodeEtcdRepo) Get(nodeId domain.NodeId, org string) (*domain.Node, error) { +func (n nodeEtcdRepo) Get(ctx context.Context, nodeId domain.NodeId, org string) (*domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.Get") + defer span.End() key := getKey(domain.Node{Id: nodeId, Org: org}) - resp, err := n.etcd.Get(context.TODO(), key) + resp, err := n.etcd.Get(ctx, key) if err != nil { return nil, err } @@ -63,32 +73,44 @@ func (n nodeEtcdRepo) Get(nodeId domain.NodeId, org string) (*domain.Node, error return n.nodeMarshaller.Unmarshal(resp.Kvs[0].Value) } -func (n nodeEtcdRepo) ListNodePool() ([]domain.Node, error) { +func (n nodeEtcdRepo) ListNodePool(ctx context.Context) ([]domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.ListNodePool") + defer span.End() keyPrefix := fmt.Sprintf("%s/pool", getKeyPrefix) - return n.listNodes(keyPrefix) + return n.listNodes(ctx, keyPrefix) } -func (n nodeEtcdRepo) ListOrgOwnedNodes(org string) ([]domain.Node, error) { +func (n nodeEtcdRepo) ListOrgOwnedNodes(ctx context.Context, org string) ([]domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.ListOrgOwnedNodes") + defer span.End() keyPrefix := fmt.Sprintf("%s/orgs/%s", getKeyPrefix, org) - return n.listNodes(keyPrefix) + return n.listNodes(ctx, keyPrefix) } -func (n nodeEtcdRepo) ListAllNodes() ([]domain.Node, error) { - return n.listNodes(getKeyPrefix) +func (n nodeEtcdRepo) ListAllNodes(ctx context.Context) ([]domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.ListAllNodes") + defer span.End() + return n.listNodes(ctx, getKeyPrefix) } -func (n nodeEtcdRepo) QueryNodePool(query domain.Query) ([]domain.Node, error) { +func (n nodeEtcdRepo) QueryNodePool(ctx context.Context, query domain.Query) ([]domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.QueryNodePool") + defer span.End() keyPrefix := fmt.Sprintf("%s/pool", queryKeyPrefix) if len(query) == 0 { - return n.listNodes(keyPrefix) + return n.listNodes(ctx, keyPrefix) } - nodeIds, err := n.queryNodes(query, keyPrefix) + nodeIds, err := n.queryNodes(ctx, query, keyPrefix) if err != nil { return nil, err } nodes := make([]domain.Node, 0) for _, nodeId := range nodeIds { - node, err := n.Get(nodeId, "") + node, err := n.Get(ctx, nodeId, "") if err != nil { log.Println(err) continue @@ -98,18 +120,21 @@ func (n nodeEtcdRepo) QueryNodePool(query domain.Query) ([]domain.Node, error) { return nodes, nil } -func (n nodeEtcdRepo) QueryOrgOwnedNodes(query domain.Query, org string) ([]domain.Node, error) { +func (n nodeEtcdRepo) QueryOrgOwnedNodes(ctx context.Context, query domain.Query, org string) ([]domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.QueryOrgOwnedNodes") + defer span.End() keyPrefix := fmt.Sprintf("%s/orgs/%s", queryKeyPrefix, org) if len(query) == 0 { - return n.ListOrgOwnedNodes(org) + return n.ListOrgOwnedNodes(ctx, org) } - nodeIds, err := n.queryNodes(query, keyPrefix) + nodeIds, err := n.queryNodes(ctx, query, keyPrefix) if err != nil { return nil, err } nodes := make([]domain.Node, 0) for _, nodeId := range nodeIds { - node, err := n.Get(nodeId, org) + node, err := n.Get(ctx, nodeId, org) if err != nil { log.Println(err) continue @@ -119,9 +144,12 @@ func (n nodeEtcdRepo) QueryOrgOwnedNodes(query domain.Query, org string) ([]doma return nodes, nil } -func (n nodeEtcdRepo) listNodes(keyPrefix string) ([]domain.Node, error) { +func (n nodeEtcdRepo) listNodes(ctx context.Context, keyPrefix string) ([]domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.listNodes") + defer span.End() nodes := make([]domain.Node, 0) - resp, err := n.etcd.Get(context.TODO(), keyPrefix, etcd.WithPrefix()) + resp, err := n.etcd.Get(ctx, keyPrefix, etcd.WithPrefix()) if err != nil { return nil, err } @@ -135,56 +163,74 @@ func (n nodeEtcdRepo) listNodes(keyPrefix string) ([]domain.Node, error) { return nodes, nil } -func (n nodeEtcdRepo) PutLabel(node domain.Node, label domain.Label) (*domain.Node, error) { - err := n.putLabelGetModel(node, label) +func (n nodeEtcdRepo) PutLabel(ctx context.Context, node domain.Node, label domain.Label) (*domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.PutLabel") + defer span.End() + err := n.putLabelGetModel(ctx, node, label) if err != nil { return nil, err } - err = n.putLabelQueryModel(node, label) + err = n.putLabelQueryModel(ctx, node, label) if err != nil { return nil, err } - return n.Get(node.Id, node.Org) + return n.Get(ctx, node.Id, node.Org) } -func (n nodeEtcdRepo) DeleteLabel(node domain.Node, labelKey string) (*domain.Node, error) { - err := n.deleteLabelGetModel(node, labelKey) +func (n nodeEtcdRepo) DeleteLabel(ctx context.Context, node domain.Node, labelKey string) (*domain.Node, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.DeleteLabel") + defer span.End() + err := n.deleteLabelGetModel(ctx, node, labelKey) if err != nil { return nil, err } - err = n.deleteLabelQueryModel(node, labelKey) + err = n.deleteLabelQueryModel(ctx, node, labelKey) if err != nil { return nil, err } - return n.Get(node.Id, node.Org) + return n.Get(ctx, node.Id, node.Org) } -func (n nodeEtcdRepo) deleteNodeGetModel(node domain.Node) error { +func (n nodeEtcdRepo) deleteNodeGetModel(ctx context.Context, node domain.Node) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.deleteNodeGetModel") + defer span.End() key := getKey(node) - _, err := n.etcd.Delete(context.TODO(), key) + _, err := n.etcd.Delete(ctx, key) return err } -func (n nodeEtcdRepo) deleteNodeQueryModel(node domain.Node) (err error) { +func (n nodeEtcdRepo) deleteNodeQueryModel(ctx context.Context, node domain.Node) (err error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.deleteNodeQueryModel") + defer span.End() for _, label := range node.Labels { key := queryKey(node, label.Key()) - _, delErr := n.etcd.Delete(context.TODO(), key) + _, delErr := n.etcd.Delete(ctx, key) err = errors.Join(err, delErr) } return err } -func (n nodeEtcdRepo) putNodeGetModel(node domain.Node) error { +func (n nodeEtcdRepo) putNodeGetModel(ctx context.Context, node domain.Node) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.putNodeGetModel") + defer span.End() nodeMarshalled, err := n.nodeMarshaller.Marshal(node) if err != nil { return err } key := getKey(node) - _, err = n.etcd.Put(context.TODO(), key, string(nodeMarshalled)) + _, err = n.etcd.Put(ctx, key, string(nodeMarshalled)) return err } -func (n nodeEtcdRepo) putLabelGetModel(node domain.Node, label domain.Label) error { +func (n nodeEtcdRepo) putLabelGetModel(ctx context.Context, node domain.Node, label domain.Label) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.putLabelGetModel") + defer span.End() labelIndex := -1 for i, nodeLabel := range node.Labels { if nodeLabel.Key() == label.Key() { @@ -196,21 +242,27 @@ func (n nodeEtcdRepo) putLabelGetModel(node domain.Node, label domain.Label) err } else { node.Labels = append(node.Labels, label) } - return n.putNodeGetModel(node) + return n.putNodeGetModel(ctx, node) } -func (n nodeEtcdRepo) deleteLabelGetModel(node domain.Node, labelKey string) error { +func (n nodeEtcdRepo) deleteLabelGetModel(ctx context.Context, node domain.Node, labelKey string) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.deleteLabelGetModel") + defer span.End() labelIndex := findIndexByLabelKey(node, labelKey) if labelIndex >= 0 { node.Labels = slices.Delete(node.Labels, labelIndex, labelIndex+1) - return n.putNodeGetModel(node) + return n.putNodeGetModel(ctx, node) } return domain.ErrNotFound("label") } -func (n nodeEtcdRepo) putNodeQueryModel(node domain.Node) error { +func (n nodeEtcdRepo) putNodeQueryModel(ctx context.Context, node domain.Node) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.putNodeQueryModel") + defer span.End() for _, label := range node.Labels { - err := n.putLabelQueryModel(node, label) + err := n.putLabelQueryModel(ctx, node, label) if err != nil { return err } @@ -218,26 +270,35 @@ func (n nodeEtcdRepo) putNodeQueryModel(node domain.Node) error { return nil } -func (n nodeEtcdRepo) putLabelQueryModel(node domain.Node, label domain.Label) error { +func (n nodeEtcdRepo) putLabelQueryModel(ctx context.Context, node domain.Node, label domain.Label) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.putLabelQueryModel") + defer span.End() labelMarshalled, err := n.labelMarshaller.Marshal(label) if err != nil { return err } key := queryKey(node, label.Key()) - _, err = n.etcd.Put(context.TODO(), key, string(labelMarshalled)) + _, err = n.etcd.Put(ctx, key, string(labelMarshalled)) return err } -func (n nodeEtcdRepo) deleteLabelQueryModel(node domain.Node, labelKey string) error { +func (n nodeEtcdRepo) deleteLabelQueryModel(ctx context.Context, node domain.Node, labelKey string) error { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.deleteLabelQueryModel") + defer span.End() key := queryKey(node, labelKey) - _, err := n.etcd.Delete(context.TODO(), key) + _, err := n.etcd.Delete(ctx, key) return err } -func (n nodeEtcdRepo) queryNodes(query domain.Query, keyPrefix string) ([]domain.NodeId, error) { +func (n nodeEtcdRepo) queryNodes(ctx context.Context, query domain.Query, keyPrefix string) ([]domain.NodeId, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.DeletqueryNodeseLabel") + defer span.End() nodeIds := make([]domain.NodeId, 0) for i, selector := range query { - currNodes, err := n.selectNodes(selector, keyPrefix) + currNodes, err := n.selectNodes(ctx, selector, keyPrefix) if err != nil { return nil, err } @@ -254,9 +315,12 @@ func (n nodeEtcdRepo) queryNodes(query domain.Query, keyPrefix string) ([]domain return nodeIds, nil } -func (n nodeEtcdRepo) selectNodes(selector domain.Selector, keyPrefix string) ([]domain.NodeId, error) { +func (n nodeEtcdRepo) selectNodes(ctx context.Context, selector domain.Selector, keyPrefix string) ([]domain.NodeId, error) { + tracer := otel.Tracer("magnetar.NodeRepo") + ctx, span := tracer.Start(ctx, "NodeRepo.selectNodes") + defer span.End() prefix := fmt.Sprintf("%s/%s/", keyPrefix, selector.LabelKey) - resp, err := n.etcd.Get(context.TODO(), prefix, etcd.WithPrefix()) + resp, err := n.etcd.Get(ctx, prefix, etcd.WithPrefix()) if err != nil { return nil, err } diff --git a/internal/servers/registration_async.go b/internal/servers/registration_async.go index ee212a3..c9b9628 100644 --- a/internal/servers/registration_async.go +++ b/internal/servers/registration_async.go @@ -1,20 +1,23 @@ package servers import ( + "context" + "log" + "github.com/c12s/magnetar/internal/mappers/proto" "github.com/c12s/magnetar/internal/services" "github.com/c12s/magnetar/pkg/api" "github.com/c12s/magnetar/pkg/messaging" - "log" + "go.opentelemetry.io/otel" ) type RegistrationAsyncServer struct { subscriber messaging.Subscriber publisher messaging.Publisher - service services.RegistrationService + service *services.RegistrationService } -func NewRegistrationAsyncServer(subscriber messaging.Subscriber, publisher messaging.Publisher, service services.RegistrationService) (*RegistrationAsyncServer, error) { +func NewRegistrationAsyncServer(subscriber messaging.Subscriber, publisher messaging.Publisher, service *services.RegistrationService) (*RegistrationAsyncServer, error) { return &RegistrationAsyncServer{ subscriber: subscriber, publisher: publisher, @@ -26,42 +29,54 @@ func (n *RegistrationAsyncServer) Serve() error { return n.subscriber.Subscribe(n.register) } -func (n *RegistrationAsyncServer) register(msg []byte, replySubject string) { +func (n *RegistrationAsyncServer) register(ctx context.Context, msg []byte, replySubject string) { + tracer := otel.Tracer("magnetar.RegistrationAsyncServer") + ctx, span := tracer.Start(ctx, "RegisterMessage") + defer span.End() + reqProto := &api.RegistrationReq{} - err := reqProto.Unmarshal(msg) - if err != nil { + if err := reqProto.Unmarshal(msg); err != nil { + span.RecordError(err) log.Println(err) return } + req, err := proto.RegistrationReqToDomain(reqProto) if err != nil { + span.RecordError(err) log.Println(err) return } - resp, err := n.service.Register(*req) + + resp, err := n.service.Register(ctx, *req) if err != nil { + span.RecordError(err) log.Println(err) return } + respProto, err := proto.RegistrationRespFromDomain(*resp) if err != nil { + span.RecordError(err) log.Println(err) return } + respMarshalled, err := respProto.Marshal() if err != nil { + span.RecordError(err) log.Println(err) return } - err = n.publisher.Publish(respMarshalled, replySubject) - if err != nil { + + if err := n.publisher.Publish(ctx, respMarshalled, replySubject); err != nil { + span.RecordError(err) log.Println(err) } } func (n *RegistrationAsyncServer) GracefulStop() { - err := n.subscriber.Unsubscribe() - if err != nil { + if err := n.subscriber.Unsubscribe(); err != nil { log.Println(err) } } diff --git a/internal/services/authorization.go b/internal/services/authorization.go index 5b217a7..2b61532 100644 --- a/internal/services/authorization.go +++ b/internal/services/authorization.go @@ -3,9 +3,11 @@ package services import ( "context" "fmt" - "github.com/golang-jwt/jwt/v5" "log" "strings" + + "github.com/golang-jwt/jwt/v5" + "go.opentelemetry.io/otel" ) type AuthZService struct { @@ -17,6 +19,9 @@ func NewAuthZService(key string) AuthZService { } func (s AuthZService) Authorize(ctx context.Context, permName string, objKind string, objId string) bool { + tracer := otel.Tracer("magnetar.AuthorizeService") + ctx, span := tracer.Start(ctx, "Authorize") + defer span.End() tokenString, ok := ctx.Value("authz-token").(string) if !ok { log.Println("no token provided") diff --git a/internal/services/labels.go b/internal/services/labels.go index 68e1ab7..1f6fcdc 100644 --- a/internal/services/labels.go +++ b/internal/services/labels.go @@ -2,8 +2,12 @@ package services import ( "context" + "log" + "github.com/c12s/magnetar/internal/domain" oortapi "github.com/c12s/oort/pkg/api" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" ) type LabelService struct { @@ -19,34 +23,72 @@ func NewLabelService(nodeRepo domain.NodeRepo, evaluator oortapi.OortEvaluatorCl } func (l *LabelService) PutLabel(ctx context.Context, req domain.PutLabelReq) (*domain.PutLabelResp, error) { + tracer := otel.Tracer("magnetar.LabelService") + ctx, span := tracer.Start(ctx, "PutLabel") + defer span.End() + + span.SetAttributes( + attribute.String("nodeId", req.NodeId.Value), + attribute.String("label", req.Label.Key()), + ) + if !l.authorizer.Authorize(ctx, "node.label.put", "node", req.NodeId.Value) { + span.AddEvent("authorization failed") return nil, domain.ErrForbidden } - node, err := l.nodeRepo.Get(req.NodeId, req.Org) + span.AddEvent("authorization granted") + + node, err := l.nodeRepo.Get(ctx, req.NodeId, req.Org) if err != nil { + span.RecordError(err) + log.Println("Get node error:", err) return nil, err } - node, err = l.nodeRepo.PutLabel(*node, req.Label) + + node, err = l.nodeRepo.PutLabel(ctx, *node, req.Label) if err != nil { + span.RecordError(err) + log.Println("PutLabel error:", err) return nil, err } + + span.AddEvent("label updated") return &domain.PutLabelResp{ Node: *node, }, nil } func (l *LabelService) DeleteLabel(ctx context.Context, req domain.DeleteLabelReq) (*domain.DeleteLabelResp, error) { + tracer := otel.Tracer("magnetar.LabelService") + ctx, span := tracer.Start(ctx, "DeleteLabel") + defer span.End() + + span.SetAttributes( + attribute.String("nodeId", req.NodeId.Value), + attribute.String("labelKey", req.LabelKey), + ) + if !l.authorizer.Authorize(ctx, "node.label.delete", "node", req.NodeId.Value) { + span.AddEvent("authorization failed") return nil, domain.ErrForbidden } - node, err := l.nodeRepo.Get(req.NodeId, req.Org) + span.AddEvent("authorization granted") + + node, err := l.nodeRepo.Get(ctx, req.NodeId, req.Org) if err != nil { + span.RecordError(err) + log.Println("Get node error:", err) return nil, err } - node, err = l.nodeRepo.DeleteLabel(*node, req.LabelKey) + + node, err = l.nodeRepo.DeleteLabel(ctx, *node, req.LabelKey) if err != nil { + span.RecordError(err) + log.Println("DeleteLabel error:", err) return nil, err } + + span.AddEvent("label deleted") return &domain.DeleteLabelResp{ Node: *node, }, nil diff --git a/internal/services/nodes.go b/internal/services/nodes.go index 50f0913..9495855 100644 --- a/internal/services/nodes.go +++ b/internal/services/nodes.go @@ -8,6 +8,7 @@ import ( "github.com/c12s/magnetar/internal/domain" meridian_api "github.com/c12s/meridian/pkg/api" oortapi "github.com/c12s/oort/pkg/api" + "go.opentelemetry.io/otel" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -21,7 +22,14 @@ type NodeService struct { gravity gravity_api.AgentQueueClient } -func NewNodeService(nodeRepo domain.NodeRepo, evaluator oortapi.OortEvaluatorClient, administrator *oortapi.AdministrationAsyncClient, authorizer AuthZService, meridian meridian_api.MeridianClient, gravity gravity_api.AgentQueueClient) (*NodeService, error) { +func NewNodeService( + nodeRepo domain.NodeRepo, + evaluator oortapi.OortEvaluatorClient, + administrator *oortapi.AdministrationAsyncClient, + authorizer AuthZService, + meridian meridian_api.MeridianClient, + gravity gravity_api.AgentQueueClient, +) (*NodeService, error) { return &NodeService{ nodeRepo: nodeRepo, administrator: administrator, @@ -32,90 +40,101 @@ func NewNodeService(nodeRepo domain.NodeRepo, evaluator oortapi.OortEvaluatorCli } func (n *NodeService) GetFromNodePool(ctx context.Context, req domain.GetFromNodePoolReq) (*domain.GetFromNodePoolResp, error) { - node, err := n.nodeRepo.Get(req.Id, "") + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.GetFromNodePool") + defer span.End() + + node, err := n.nodeRepo.Get(ctx, req.Id, "") if err != nil { + span.RecordError(err) return nil, err } - return &domain.GetFromNodePoolResp{ - Node: *node, - }, nil + return &domain.GetFromNodePoolResp{Node: *node}, nil } func (n *NodeService) GetFromOrg(ctx context.Context, req domain.GetFromOrgReq) (*domain.GetFromOrgResp, error) { + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.GetFromOrg") + defer span.End() + if !n.authorizer.Authorize(ctx, "node.get", "node", req.Id.Value) { - return nil, domain.ErrForbidden + err := domain.ErrForbidden + span.RecordError(err) + return nil, err } - node, err := n.nodeRepo.Get(req.Id, req.Org) + + node, err := n.nodeRepo.Get(ctx, req.Id, req.Org) if err != nil { + span.RecordError(err) return nil, err } - return &domain.GetFromOrgResp{ - Node: *node, - }, nil + return &domain.GetFromOrgResp{Node: *node}, nil } func (n *NodeService) ClaimOwnership(ctx context.Context, req domain.ClaimOwnershipReq) (*domain.ClaimOwnershipResp, error) { + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.ClaimOwnership") + defer span.End() + if !n.authorizer.Authorize(ctx, "node.put", "org", req.Org) { - return nil, domain.ErrForbidden + err := domain.ErrForbidden + span.RecordError(err) + return nil, err } - cluster, err := n.nodeRepo.ListOrgOwnedNodes(req.Org) + + cluster, err := n.nodeRepo.ListOrgOwnedNodes(ctx, req.Org) if err != nil { + span.RecordError(err) return nil, err } - nodes, err := n.nodeRepo.QueryNodePool(req.Query) + + nodes, err := n.nodeRepo.QueryNodePool(ctx, req.Query) if err != nil { + span.RecordError(err) return nil, err } + for _, node := range nodes { - err = n.nodeRepo.Delete(node) - if err != nil { + if err := n.nodeRepo.Delete(ctx, node); err != nil { log.Println(err) + span.RecordError(err) continue } node.Org = req.Org - err = n.nodeRepo.Put(node) - if err != nil { + if err := n.nodeRepo.Put(ctx, node); err != nil { log.Println(err) + span.RecordError(err) continue } - err = n.administrator.SendRequest(&oortapi.CreateInheritanceRelReq{ - From: &oortapi.Resource{ - Id: req.Org, - Kind: "org", - }, - To: &oortapi.Resource{ - Id: node.Id.Value, - Kind: "node", - }, + if err := n.administrator.SendRequest(ctx, &oortapi.CreateInheritanceRelReq{ + From: &oortapi.Resource{Id: req.Org, Kind: "org"}, + To: &oortapi.Resource{Id: node.Id.Value, Kind: "node"}, }, func(resp *oortapi.AdministrationAsyncResp) { if resp.Error != "" { log.Println(resp.Error) } - }) - if err != nil { + }); err != nil { log.Println(err) + span.RecordError(err) } } - // upsert ns - listNodesResp, err := n.ListOrgOwnedNodes(ctx, domain.ListOrgOwnedNodesReq{ - Org: req.Org, - }) + + listNodesResp, err := n.ListOrgOwnedNodes(ctx, domain.ListOrgOwnedNodesReq{Org: req.Org}) if err != nil { + span.RecordError(err) return nil, err } + resources := make(map[string]float64) for _, node := range listNodesResp.Nodes { for resource, quota := range node.Resources { - resources[resource] = resources[resource] + quota + resources[resource] += quota } } + ctx = setOutgoingContext(ctx) - _, err = n.meridian.GetNamespace(ctx, &meridian_api.GetNamespaceReq{ - OrgId: req.Org, - Name: "default", - }) + _, err = n.meridian.GetNamespace(ctx, &meridian_api.GetNamespaceReq{OrgId: req.Org, Name: "default"}) if err != nil { - log.Println(err) status, _ := status.FromError(err) if status.Code() == codes.NotFound { _, err = n.meridian.AddNamespace(ctx, &meridian_api.AddNamespaceReq{ @@ -127,11 +146,12 @@ func (n *NodeService) ClaimOwnership(ctx context.Context, req domain.ClaimOwners Profile: &meridian_api.SeccompProfile{ Version: "v1.0.0", DefaultAction: "ALLOW", - Syscalls: make([]*meridian_api.SyscallRule, 0), + Syscalls: []*meridian_api.SyscallRule{}, }, }) if err != nil { log.Println(err) + span.RecordError(err) } } } else { @@ -142,89 +162,114 @@ func (n *NodeService) ClaimOwnership(ctx context.Context, req domain.ClaimOwners }) if err != nil { log.Println(err) + span.RecordError(err) } } - // join cluster + if len(nodes) == 0 { - return &domain.ClaimOwnershipResp{ - Nodes: nodes, - }, nil + return &domain.ClaimOwnershipResp{Nodes: nodes}, nil } + joinAddress := nodes[0].BindAddress if len(cluster) > 0 { joinAddress = cluster[0].BindAddress } - log.Println("join address: " + joinAddress) + for _, node := range nodes { ctx = setOutgoingContext(ctx) - _, err = n.gravity.JoinCluster(ctx, &gravity_api.JoinClusterRequest{ + if _, err := n.gravity.JoinCluster(ctx, &gravity_api.JoinClusterRequest{ NodeId: node.Id.Value, JoinAddress: joinAddress, ClusterId: req.Org, - }) - if err != nil { + }); err != nil { log.Println(err) + span.RecordError(err) } } - return &domain.ClaimOwnershipResp{ - Nodes: nodes, - }, nil + + return &domain.ClaimOwnershipResp{Nodes: nodes}, nil } func (n *NodeService) ListNodePool(ctx context.Context, req domain.ListNodePoolReq) (*domain.ListNodePoolResp, error) { - nodes, err := n.nodeRepo.ListNodePool() + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.ListNodePool") + defer span.End() + + nodes, err := n.nodeRepo.ListNodePool(ctx) if err != nil { + span.RecordError(err) return nil, err } - return &domain.ListNodePoolResp{ - Nodes: nodes, - }, nil + return &domain.ListNodePoolResp{Nodes: nodes}, nil } func (n *NodeService) ListOrgOwnedNodes(ctx context.Context, req domain.ListOrgOwnedNodesReq) (*domain.ListOrgOwnedNodesResp, error) { + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.ListOrgOwnedNodes") + defer span.End() + if !n.authorizer.Authorize(ctx, "node.get", "org", req.Org) { - return nil, domain.ErrForbidden + err := domain.ErrForbidden + span.RecordError(err) + return nil, err } - nodes, err := n.nodeRepo.ListOrgOwnedNodes(req.Org) + + nodes, err := n.nodeRepo.ListOrgOwnedNodes(ctx, req.Org) if err != nil { + span.RecordError(err) return nil, err } - return &domain.ListOrgOwnedNodesResp{ - Nodes: nodes, - }, nil + return &domain.ListOrgOwnedNodesResp{Nodes: nodes}, nil } func (n *NodeService) ListAllNodes(ctx context.Context) ([]domain.Node, error) { - return n.nodeRepo.ListAllNodes() + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.ListAllNodes") + defer span.End() + + nodes, err := n.nodeRepo.ListAllNodes(ctx) + if err != nil { + span.RecordError(err) + return nil, err + } + return nodes, nil } func (n *NodeService) QueryNodePool(ctx context.Context, req domain.QueryNodePoolReq) (*domain.QueryNodePoolResp, error) { - nodes, err := n.nodeRepo.QueryNodePool(req.Query) + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.QueryNodePool") + defer span.End() + + nodes, err := n.nodeRepo.QueryNodePool(ctx, req.Query) if err != nil { + span.RecordError(err) return nil, err } - return &domain.QueryNodePoolResp{ - Nodes: nodes, - }, nil + return &domain.QueryNodePoolResp{Nodes: nodes}, nil } func (n *NodeService) QueryOrgOwnedNodes(ctx context.Context, req domain.QueryOrgOwnedNodesReq) (*domain.QueryOrgOwnedNodesResp, error) { + tracer := otel.Tracer("magnetar.NodeService") + ctx, span := tracer.Start(ctx, "NodeService.QueryOrgOwnedNodes") + defer span.End() + if !n.authorizer.Authorize(ctx, "node.get", "org", req.Org) { - return nil, domain.ErrForbidden + err := domain.ErrForbidden + span.RecordError(err) + return nil, err } - nodes, err := n.nodeRepo.QueryOrgOwnedNodes(req.Query, req.Org) + + nodes, err := n.nodeRepo.QueryOrgOwnedNodes(ctx, req.Query, req.Org) if err != nil { + span.RecordError(err) return nil, err } - return &domain.QueryOrgOwnedNodesResp{ - Nodes: nodes, - }, nil + return &domain.QueryOrgOwnedNodesResp{Nodes: nodes}, nil } func setOutgoingContext(ctx context.Context) context.Context { md, ok := metadata.FromIncomingContext(ctx) if !ok { - log.Println("[WARN] no metadata in ctx when sending req") return ctx } return metadata.NewOutgoingContext(ctx, md) diff --git a/internal/services/registration.go b/internal/services/registration.go index 9b69467..28b45b3 100644 --- a/internal/services/registration.go +++ b/internal/services/registration.go @@ -1,8 +1,11 @@ package services import ( + "context" + "github.com/c12s/magnetar/internal/domain" "github.com/google/uuid" + "go.opentelemetry.io/otel" ) type RegistrationService struct { @@ -15,18 +18,23 @@ func NewRegistrationService(nodeRepo domain.NodeRepo) (*RegistrationService, err }, nil } -func (r *RegistrationService) Register(req domain.RegistrationReq) (*domain.RegistrationResp, error) { +func (r *RegistrationService) Register(ctx context.Context, req domain.RegistrationReq) (*domain.RegistrationResp, error) { + tracer := otel.Tracer("magnetar.RegistrationService") + ctx, span := tracer.Start(ctx, "Register") + defer span.End() + node := domain.Node{ Id: domain.NodeId{ Value: generateNodeId(), }, - Labels: req.Labels, - Resources: req.Resources, + Labels: req.Labels, + Resources: req.Resources, BindAddress: req.BindAddress, } - err := r.nodeRepo.Put(node) + err := r.nodeRepo.Put(ctx, node) if err != nil { + span.RecordError(err) return nil, err } diff --git a/internal/startup/app.go b/internal/startup/app.go index 2c4c3bf..7781ab2 100644 --- a/internal/startup/app.go +++ b/internal/startup/app.go @@ -21,6 +21,8 @@ import ( oortapi "github.com/c12s/oort/pkg/api" natsgo "github.com/nats-io/nats.go" etcd "go.etcd.io/etcd/client/v3" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/reflection" @@ -60,6 +62,11 @@ func NewAppWithConfig(config *configs.Config) (*app, error) { } func (a *app) Start() error { + shutdownTracing := initTracing( + "magnetar", + a.config.JaegerGRPCEndpoint(), + ) + a.shutdownProcesses = append(a.shutdownProcesses, shutdownTracing) a.init() err := a.startRegistrationServer() @@ -145,7 +152,12 @@ func (a *app) initGrpcServer() { if a.magnetarServer == nil { log.Fatalln("magnetar server is nil") } - s := grpc.NewServer(grpc.UnaryInterceptor(servers.GetAuthInterceptor())) + + s := grpc.NewServer( + grpc.StatsHandler(otelgrpc.NewServerHandler()), + grpc.UnaryInterceptor(servers.GetAuthInterceptor()), + ) + api.RegisterMagnetarServer(s, a.magnetarServer) reflection.Register(s) a.grpcServer = s @@ -175,11 +187,19 @@ func (a *app) initRegistrationServer() { if a.registrationSubscriber == nil { log.Fatalln("registration req subscriber is nil") } - server, err := servers.NewRegistrationAsyncServer(a.registrationSubscriber, a.publisher, *a.registrationService) + + tracer := otel.Tracer("magnetar.AppInit") + _, span := tracer.Start(context.Background(), "InitRegistrationServer") + defer span.End() + + server, err := servers.NewRegistrationAsyncServer(a.registrationSubscriber, a.publisher, a.registrationService) if err != nil { + span.RecordError(err) log.Fatalln(err) } + a.registrationServer = server + log.Println("Registration async server initialized") } func (a *app) initRegistrationService() { @@ -203,6 +223,7 @@ func (a *app) initNodeService() { if a.gravity == nil { log.Fatalln("gravity is nil") } + nodeService, err := services.NewNodeService(a.nodeRepo, a.evaluatorClient, a.administratorClient, a.authzService, a.meridian, a.gravity) if err != nil { log.Fatalln(err) @@ -226,7 +247,11 @@ func (a *app) initAuthZService() { } func (a *app) initMeridian() { - conn, err := grpc.NewClient(a.config.MeridianAddress(), grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient( + a.config.MeridianAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + ) if err != nil { log.Fatalln(err) } @@ -234,8 +259,11 @@ func (a *app) initMeridian() { } func (a *app) initGravity() { - log.Println(a.config.GravityAddress()) - conn, err := grpc.NewClient(a.config.GravityAddress(), grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient( + a.config.GravityAddress(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), + ) if err != nil { log.Fatalln(err) } diff --git a/internal/startup/tracing.go b/internal/startup/tracing.go new file mode 100644 index 0000000..911f969 --- /dev/null +++ b/internal/startup/tracing.go @@ -0,0 +1,45 @@ +package startup + +import ( + "context" + "log" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" +) + +func initTracing(serviceName, endpoint string) func() { + exp, err := otlptracegrpc.New( + context.Background(), + otlptracegrpc.WithEndpoint(endpoint), + otlptracegrpc.WithInsecure(), + ) + if err != nil { + log.Fatalf("otlp grpc exporter error: %v", err) + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(serviceName), + )), + ) + + otel.SetTracerProvider(tp) + + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ) + + return func() { + _ = tp.Shutdown(context.Background()) + } +} diff --git a/pkg/api/registration_async.go b/pkg/api/registration_async.go index 53183d7..e7e43bf 100644 --- a/pkg/api/registration_async.go +++ b/pkg/api/registration_async.go @@ -1,6 +1,7 @@ package api import ( + context "context" "fmt" "log" @@ -34,7 +35,7 @@ func NewRegistrationAsyncClient(natsAddress string) (*RegistrationAsyncClient, e }, nil } -func (n *RegistrationAsyncClient) Register(req *RegistrationReq, callback RegistrationCallback) error { +func (n *RegistrationAsyncClient) Register(ctx context.Context, req *RegistrationReq, callback RegistrationCallback) error { reqMarshalled, err := req.Marshal() if err != nil { return err @@ -42,7 +43,7 @@ func (n *RegistrationAsyncClient) Register(req *RegistrationReq, callback Regist replySubject := n.publisher.GenerateReplySubject() subscriber := n.subscriberFactory(replySubject) - err = subscriber.Subscribe(func(msg []byte, _ string) { + err = subscriber.Subscribe(func(subCtx context.Context, msg []byte, _ string) { resp := &RegistrationResp{} err := resp.Unmarshal(msg) if err != nil { @@ -56,7 +57,7 @@ func (n *RegistrationAsyncClient) Register(req *RegistrationReq, callback Regist } // send request - err = n.publisher.Request(reqMarshalled, RegistrationSubject, replySubject) + err = n.publisher.Request(ctx, reqMarshalled, RegistrationSubject, replySubject) if err != nil { _ = subscriber.Unsubscribe() return err diff --git a/pkg/messaging/messaging.go b/pkg/messaging/messaging.go index f330605..44a0b50 100644 --- a/pkg/messaging/messaging.go +++ b/pkg/messaging/messaging.go @@ -1,12 +1,14 @@ package messaging +import "context" + type Subscriber interface { - Subscribe(handler func(msg []byte, replySubject string)) error + Subscribe(handler func(ctx context.Context, msg []byte, replySubject string)) error Unsubscribe() error } type Publisher interface { - Publish(msg []byte, subject string) error - Request(msg []byte, subject, replySubject string) error + Publish(ctx context.Context, msg []byte, subject string) error + Request(ctx context.Context, msg []byte, subject, replySubject string) error GenerateReplySubject() string } diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index 83136d8..65a4943 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -1,9 +1,13 @@ package nats import ( + "context" "errors" + "github.com/c12s/magnetar/pkg/messaging" "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) type publisher struct { @@ -19,12 +23,43 @@ func NewPublisher(conn *nats.Conn) (messaging.Publisher, error) { }, nil } -func (p publisher) Publish(msg []byte, subject string) error { - return p.conn.Publish(subject, msg) +func (p publisher) Publish(ctx context.Context, msg []byte, subject string) error { + + tracer := otel.Tracer("nats.publisher") + ctx, span := tracer.Start(ctx, "NATS Publish "+subject) + defer span.End() + + natsMsg := &nats.Msg{ + Subject: subject, + Data: msg, + Header: nats.Header{}, + } + + otel.GetTextMapPropagator().Inject(ctx, + propagation.HeaderCarrier(natsMsg.Header), + ) + + return p.conn.PublishMsg(natsMsg) } -func (p publisher) Request(msg []byte, subject, replySubject string) error { - return p.conn.PublishRequest(subject, replySubject, msg) +func (p publisher) Request(ctx context.Context, msg []byte, subject, replySubject string) error { + + tracer := otel.Tracer("nats.publisher") + ctx, span := tracer.Start(ctx, "NATS Request "+subject) + defer span.End() + + natsMsg := &nats.Msg{ + Subject: subject, + Reply: replySubject, + Data: msg, + Header: nats.Header{}, + } + + otel.GetTextMapPropagator().Inject(ctx, + propagation.HeaderCarrier(natsMsg.Header), + ) + + return p.conn.PublishMsg(natsMsg) } func (p publisher) GenerateReplySubject() string { diff --git a/pkg/messaging/nats/subscriber.go b/pkg/messaging/nats/subscriber.go index e8b0b39..3b1fc4c 100644 --- a/pkg/messaging/nats/subscriber.go +++ b/pkg/messaging/nats/subscriber.go @@ -2,8 +2,13 @@ package nats import ( "errors" + + "context" + "github.com/c12s/magnetar/pkg/messaging" "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) type subscriber struct { @@ -24,16 +29,30 @@ func NewSubscriber(conn *nats.Conn, subject, queue string) (messaging.Subscriber }, nil } -func (s *subscriber) Subscribe(handler func(msg []byte, replySubject string)) error { +func (s *subscriber) Subscribe(handler func(ctx context.Context, msg []byte, replySubject string)) error { if s.subscription != nil { return errors.New("already subscribed") } + subscription, err := s.conn.QueueSubscribe(s.subject, s.queue, func(msg *nats.Msg) { - handler(msg.Data, msg.Reply) + + propagator := otel.GetTextMapPropagator() + + ctx := propagator.Extract(context.Background(), + propagation.HeaderCarrier(msg.Header), + ) + + tracer := otel.Tracer("nats.subscriber") + ctx, span := tracer.Start(ctx, "NATS Receive "+s.subject) + defer span.End() + + handler(ctx, msg.Data, msg.Reply) }) + if err != nil { return err } + s.subscription = subscription return nil }