diff --git a/go.mod b/go.mod index 4b9ee32..791a23e 100755 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/c12s/kuiper -go 1.22.3 +go 1.24.0 require ( github.com/c12s/magnetar v1.0.0 @@ -12,36 +12,47 @@ require ( github.com/gorilla/mux v1.8.1 github.com/nats-io/nats.go v1.31.0 go.etcd.io/etcd/client/v3 v3.5.13 - google.golang.org/grpc v1.65.0 - google.golang.org/protobuf v1.34.1 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.66.0 + go.opentelemetry.io/otel v1.41.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.41.0 + go.opentelemetry.io/otel/sdk v1.41.0 + google.golang.org/grpc v1.79.1 + google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) require ( + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/klauspost/compress v1.17.0 // indirect - github.com/kr/pretty v0.1.0 // indirect - github.com/kr/text v0.2.0 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.etcd.io/etcd/api/v3 v3.5.13 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.41.0 // indirect + go.opentelemetry.io/otel/metric v1.41.0 // indirect + go.opentelemetry.io/otel/trace v1.41.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.17.0 // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/text v0.34.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect ) replace github.com/c12s/magnetar => ../magnetar diff --git a/go.sum b/go.sum index 8097099..87b02d8 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,20 @@ +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -14,20 +22,20 @@ github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17w github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= @@ -42,11 +50,13 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/etcd/api/v3 v3.5.13 h1:8WXU2/NBge6AUF1K1gOexB6e07NgsN1hXK0rSTtgSp4= @@ -55,8 +65,30 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.13 h1:RVZSAnWWWiI5IrYAXjQorajncORbS0zI48LQlE2 go.etcd.io/etcd/client/pkg/v3 v3.5.13/go.mod h1:XxHT4u1qU12E2+po+UVPrEeL94Um6zL58ppuJWXSAB8= go.etcd.io/etcd/client/v3 v3.5.13 h1:o0fHTNJLeO0MyVbc7I3fsCf6nrOqn5d+diSarKnB2js= go.etcd.io/etcd/client/v3 v3.5.13/go.mod h1:cqiAeY8b5DEEcpxvgWKsbLIWNM/8Wy2xJSDMtioMcoI= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.66.0 h1:w/o339tDd6Qtu3+ytwt+/jon2yjAs3Ot8Xq8pelfhSo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.66.0/go.mod h1:pdhNtM9C4H5fRdrnwO7NjxzQWhKSSxCHk/KluVqDVC0= +go.opentelemetry.io/otel v1.41.0 h1:YlEwVsGAlCvczDILpUXpIpPSL/VPugt7zHThEMLce1c= +go.opentelemetry.io/otel v1.41.0/go.mod h1:Yt4UwgEKeT05QbLwbyHXEwhnjxNO6D8L5PQP51/46dE= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.41.0 h1:ao6Oe+wSebTlQ1OEht7jlYTzQKE+pnx/iNywFvTbuuI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.41.0/go.mod h1:u3T6vz0gh/NVzgDgiwkgLxpsSF6PaPmo2il0apGJbls= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.41.0 h1:mq/Qcf28TWz719lE3/hMB4KkyDuLJIvgJnFGcd0kEUI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.41.0/go.mod h1:yk5LXEYhsL2htyDNJbEq7fWzNEigeEdV5xBF/Y+kAv0= +go.opentelemetry.io/otel/metric v1.41.0 h1:rFnDcs4gRzBcsO9tS8LCpgR0dxg4aaxWlJxCno7JlTQ= +go.opentelemetry.io/otel/metric v1.41.0/go.mod h1:xPvCwd9pU0VN8tPZYzDZV/BMj9CM9vs00GuBjeKhJps= +go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisxBp8= +go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90= +go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3TriaMlf08rXw8= +go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y= +go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= +go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U= @@ -64,28 +96,28 @@ go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -94,17 +126,19 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 h1:7whR9kGa5LUwFtpLm2ArCEejtnxlGeLbAyjFY8sGNFw= -google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go.mod h1:99sLkeliLXfdj2J75X3Ho+rrVCaJze0uwN7zDDkjPVU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= -google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= -google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0= +google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= +google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/internal/configs/config.go b/internal/configs/config.go index a082ce7..d60a444 100755 --- a/internal/configs/config.go +++ b/internal/configs/config.go @@ -15,6 +15,8 @@ type Config struct { webhooksAddress string webhookUrl string tokenKey string + jaegerHost string + jaegerGRPCPort string } func (c *Config) NatsAddress() string { @@ -57,6 +59,13 @@ func (c *Config) TokenKey() string { return c.tokenKey } +func (c *Config) JaegerGRPCEndpoint() string { + if c.jaegerHost == "" || c.jaegerGRPCPort == "" { + return "" + } + return c.jaegerHost + ":" + c.jaegerGRPCPort +} + func NewFromEnv() (*Config, error) { return &Config{ natsAddress: os.Getenv("NATS_ADDRESS"), @@ -69,5 +78,7 @@ func NewFromEnv() (*Config, error) { webhooksAddress: os.Getenv("WEBHOOK_ADDRESS"), webhookUrl: os.Getenv("WEBHOOK_URL"), tokenKey: os.Getenv("SECRET_KEY"), + jaegerHost: os.Getenv("JAEGER_HOST"), + jaegerGRPCPort: os.Getenv("JAEGER_GRPC_PORT"), }, nil } diff --git a/internal/services/config_group.go b/internal/services/config_group.go index 5105ec5..f2fad98 100644 --- a/internal/services/config_group.go +++ b/internal/services/config_group.go @@ -13,6 +13,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "gopkg.in/yaml.v3" + + "go.opentelemetry.io/otel" ) type ConfigGroupService struct { @@ -34,6 +36,10 @@ func NewConfigGroupService(administrator *oortapi.AdministrationAsyncClient, aut } func (s *ConfigGroupService) Put(ctx context.Context, config *domain.ConfigGroup, schema *quasarapi.ConfigSchemaDetails) (*domain.ConfigGroup, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupService") + ctx, span := tracer.Start(ctx, "Put") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigPut, OortResOrg, string(config.Org())) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigPut)) } @@ -72,7 +78,7 @@ func (s *ConfigGroupService) Put(ctx context.Context, config *domain.ConfigGroup if err != nil { return nil, err } - err2 := s.administrator.SendRequest(&oortapi.CreateInheritanceRelReq{ + err2 := s.administrator.SendRequest(ctx, &oortapi.CreateInheritanceRelReq{ From: &oortapi.Resource{ Id: fmt.Sprintf("%s/%s", config.Org(), config.Namespace()), Kind: OortResNamespace, @@ -91,6 +97,10 @@ func (s *ConfigGroupService) Put(ctx context.Context, config *domain.ConfigGroup } func (s *ConfigGroupService) Get(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.ConfigGroup, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupService") + ctx, span := tracer.Start(ctx, "Get") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResConfig, OortConfigId(domain.ConfTypeGroup, string(org), namespace, name, version)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } @@ -98,6 +108,10 @@ func (s *ConfigGroupService) Get(ctx context.Context, org domain.Org, namespace, } func (s *ConfigGroupService) List(ctx context.Context, org domain.Org, namespace string) ([]*domain.ConfigGroup, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupService") + ctx, span := tracer.Start(ctx, "List") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResOrg, string(org)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } @@ -105,6 +119,10 @@ func (s *ConfigGroupService) List(ctx context.Context, org domain.Org, namespace } func (s *ConfigGroupService) Delete(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.ConfigGroup, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupService") + ctx, span := tracer.Start(ctx, "Delete") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigPut, OortResConfig, OortConfigId(domain.ConfTypeGroup, string(org), namespace, name, version)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigPut)) } @@ -112,6 +130,10 @@ func (s *ConfigGroupService) Delete(ctx context.Context, org domain.Org, namespa } func (s *ConfigGroupService) Diff(ctx context.Context, referenceOrg domain.Org, referenceNamespace, referenceName, referenceVersion string, diffOrg domain.Org, diffNamespace, diffName, diffVersion string) (map[string][]domain.Diff, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupService") + ctx, span := tracer.Start(ctx, "Diff") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResConfig, OortConfigId(domain.ConfTypeGroup, string(referenceOrg), referenceNamespace, referenceName, referenceVersion)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } @@ -130,6 +152,10 @@ func (s *ConfigGroupService) Diff(ctx context.Context, referenceOrg domain.Org, } func (s *ConfigGroupService) Place(ctx context.Context, org domain.Org, namespace, name, version string, strategy *api.PlaceReq_Strategy) ([]domain.PlacementTask, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupService") + ctx, span := tracer.Start(ctx, "Place") + defer span.End() + config, err := s.store.Get(ctx, org, namespace, name, version) if err != nil { return nil, err @@ -163,6 +189,10 @@ func (s *ConfigGroupService) Place(ctx context.Context, org domain.Org, namespac } func (s *ConfigGroupService) ListPlacementTasks(ctx context.Context, org domain.Org, namespace, name, version string) ([]domain.PlacementTask, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupService") + ctx, span := tracer.Start(ctx, "ListPlacementTasks") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResConfig, OortConfigId(domain.ConfTypeGroup, string(org), namespace, name, version)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } diff --git a/internal/services/placement.go b/internal/services/placement.go index 0a84f0f..01c1484 100644 --- a/internal/services/placement.go +++ b/internal/services/placement.go @@ -14,6 +14,7 @@ import ( magnetarapi "github.com/c12s/magnetar/pkg/api" oortapi "github.com/c12s/oort/pkg/api" "github.com/google/uuid" + "go.opentelemetry.io/otel" "google.golang.org/grpc/metadata" ) @@ -38,6 +39,10 @@ func NewPlacementStore(magnetar magnetarapi.MagnetarClient, aq agent_queue.Agent } func (s *PlacementService) Place(ctx context.Context, config domain.Config, strategy *api.PlaceReq_Strategy, cmd func(taskId string) ([]byte, *domain.Error), webhookPath string) ([]domain.PlacementTask, *domain.Error) { + tracer := otel.Tracer("kuiper.PlacementService") + ctx, span := tracer.Start(ctx, "Place") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResConfig, OortConfigId(config.Type(), string(config.Org()), config.Namespace(), config.Name(), config.Version())) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } @@ -86,6 +91,10 @@ func (s *PlacementService) Place(ctx context.Context, config domain.Config, stra } func (s *PlacementService) placeByQuery(ctx context.Context, config domain.Config, nodeQuery []*magnetarapi.Selector) ([]*magnetarapi.NodeStringified, *domain.Error) { + tracer := otel.Tracer("kuiper.PlacementService") + ctx, span := tracer.Start(ctx, "placeByQuery") + defer span.End() + queryReq := &magnetarapi.QueryOrgOwnedNodesReq{ Org: string(config.Org()), } @@ -109,6 +118,10 @@ func (s *PlacementService) placeByQuery(ctx context.Context, config domain.Confi } func (s *PlacementService) placeByGossip(ctx context.Context, config domain.Config, percentage int32) ([]*magnetarapi.NodeStringified, *domain.Error) { + tracer := otel.Tracer("kuiper.PlacementService") + ctx, span := tracer.Start(ctx, "placeByGossip") + defer span.End() + queryReq := &magnetarapi.ListOrgOwnedNodesReq{ Org: string(config.Org()), } @@ -147,10 +160,18 @@ func selectRandmNodes(nodes []*magnetarapi.NodeStringified, percentage int32) [] } func (s *PlacementService) List(ctx context.Context, org domain.Org, namespace, name, version, configType string) ([]domain.PlacementTask, *domain.Error) { + tracer := otel.Tracer("kuiper.PlacementService") + ctx, span := tracer.Start(ctx, "List") + defer span.End() + return s.store.ListByConfig(ctx, org, namespace, name, version, configType) } func (s *PlacementService) UpdateStatus(ctx context.Context, org domain.Org, namespace, name, version, configType, taskId string, status domain.PlacementTaskStatus) *domain.Error { + tracer := otel.Tracer("kuiper.PlacementService") + ctx, span := tracer.Start(ctx, "UpdateStatus") + defer span.End() + return s.store.UpdateStatus(ctx, org, namespace, name, version, configType, taskId, status) } diff --git a/internal/services/standalone_config.go b/internal/services/standalone_config.go index 9dda171..161a3f4 100644 --- a/internal/services/standalone_config.go +++ b/internal/services/standalone_config.go @@ -14,6 +14,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "gopkg.in/yaml.v3" + + "go.opentelemetry.io/otel" ) type StandaloneConfigService struct { @@ -37,6 +39,10 @@ func NewStandaloneConfigService(administrator *oortapi.AdministrationAsyncClient } func (s *StandaloneConfigService) Put(ctx context.Context, config *domain.StandaloneConfig, schema *quasarapi.ConfigSchemaDetails) (*domain.StandaloneConfig, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigService") + ctx, span := tracer.Start(ctx, "Put") + defer span.End() + ctx = s.authorizer.SetOutgoingContext(ctx) _, err := s.meridian.GetNamespace(ctx, &meridian_api.GetNamespaceReq{ OrgId: string(config.Org()), @@ -82,7 +88,7 @@ func (s *StandaloneConfigService) Put(ctx context.Context, config *domain.Standa if putErr != nil { return nil, putErr } - err2 := s.administrator.SendRequest(&oortapi.CreateInheritanceRelReq{ + err2 := s.administrator.SendRequest(ctx, &oortapi.CreateInheritanceRelReq{ From: &oortapi.Resource{ Id: fmt.Sprintf("%s/%s", config.Org(), config.Namespace()), Kind: OortResNamespace, @@ -101,6 +107,10 @@ func (s *StandaloneConfigService) Put(ctx context.Context, config *domain.Standa } func (s *StandaloneConfigService) Get(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.StandaloneConfig, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigService") + ctx, span := tracer.Start(ctx, "Get") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResConfig, OortConfigId(domain.ConfTypeStandalone, string(org), namespace, name, version)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } @@ -108,6 +118,10 @@ func (s *StandaloneConfigService) Get(ctx context.Context, org domain.Org, names } func (s *StandaloneConfigService) List(ctx context.Context, org domain.Org, namespace string) ([]*domain.StandaloneConfig, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigService") + ctx, span := tracer.Start(ctx, "List") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResOrg, string(org)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } @@ -115,6 +129,10 @@ func (s *StandaloneConfigService) List(ctx context.Context, org domain.Org, name } func (s *StandaloneConfigService) Delete(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.StandaloneConfig, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigService") + ctx, span := tracer.Start(ctx, "Delete") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigPut, OortResConfig, OortConfigId(domain.ConfTypeStandalone, string(org), namespace, name, version)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigPut)) } @@ -122,6 +140,10 @@ func (s *StandaloneConfigService) Delete(ctx context.Context, org domain.Org, na } func (s *StandaloneConfigService) Diff(ctx context.Context, referenceOrg domain.Org, referenceNamespace, referenceName, referenceVersion string, diffOrg domain.Org, diffNamespace, diffName, diffVersion string) ([]domain.Diff, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigService") + ctx, span := tracer.Start(ctx, "Diff") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResConfig, OortConfigId(domain.ConfTypeStandalone, string(referenceOrg), referenceNamespace, referenceName, referenceVersion)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } @@ -140,6 +162,10 @@ func (s *StandaloneConfigService) Diff(ctx context.Context, referenceOrg domain. } func (s *StandaloneConfigService) Place(ctx context.Context, org domain.Org, namespace, name, version string, strategy *api.PlaceReq_Strategy) ([]domain.PlacementTask, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigService") + ctx, span := tracer.Start(ctx, "Place") + defer span.End() + config, err := s.store.Get(ctx, org, namespace, name, version) if err != nil { return nil, err @@ -173,6 +199,10 @@ func (s *StandaloneConfigService) Place(ctx context.Context, org domain.Org, nam } func (s *StandaloneConfigService) ListPlacementTasks(ctx context.Context, org domain.Org, namespace, name, version string) ([]domain.PlacementTask, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigService") + ctx, span := tracer.Start(ctx, "ListPlacementTasks") + defer span.End() + if !s.authorizer.Authorize(ctx, PermConfigGet, OortResConfig, OortConfigId(domain.ConfTypeStandalone, string(org), namespace, name, version)) { return nil, domain.NewError(domain.ErrTypeUnauthorized, fmt.Sprintf("Permission denied: %s", PermConfigGet)) } diff --git a/internal/startup/app.go b/internal/startup/app.go index 6ed82c7..d992b1c 100755 --- a/internal/startup/app.go +++ b/internal/startup/app.go @@ -12,6 +12,7 @@ import ( "github.com/c12s/kuiper/internal/services" "github.com/c12s/kuiper/internal/store" "github.com/gorilla/mux" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "github.com/c12s/kuiper/internal/configs" "github.com/c12s/kuiper/internal/servers" @@ -85,7 +86,10 @@ func (a *app) init() { configGroupService := services.NewConfigGroupService(administratorClient, authzService, configGroupStore, placementService, quasarClient) kuiperGrpcServer := servers.NewKuiperServer(standaloneConfigService, configGroupService) - s := grpc.NewServer(grpc.UnaryInterceptor(servers.GetAuthInterceptor())) + s := grpc.NewServer( + grpc.StatsHandler(otelgrpc.NewServerHandler()), + grpc.UnaryInterceptor(servers.GetAuthInterceptor()), + ) api.RegisterKuiperServer(s, kuiperGrpcServer) reflection.Register(s) a.grpcServer = s @@ -120,6 +124,11 @@ func (a *app) startWebhooks() { } func (a *app) Start() error { + shutdownTracing := initTracing( + "kuiper", + a.config.JaegerGRPCEndpoint(), + ) + a.shutdownProcesses = append(a.shutdownProcesses, shutdownTracing) a.init() go a.startWebhooks() return a.startGrpcServer() diff --git a/internal/startup/tracing.go b/internal/startup/tracing.go new file mode 100644 index 0000000..911f969 --- /dev/null +++ b/internal/startup/tracing.go @@ -0,0 +1,45 @@ +package startup + +import ( + "context" + "log" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" +) + +func initTracing(serviceName, endpoint string) func() { + exp, err := otlptracegrpc.New( + context.Background(), + otlptracegrpc.WithEndpoint(endpoint), + otlptracegrpc.WithInsecure(), + ) + if err != nil { + log.Fatalf("otlp grpc exporter error: %v", err) + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(serviceName), + )), + ) + + otel.SetTracerProvider(tp) + + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ) + + return func() { + _ = tp.Shutdown(context.Background()) + } +} diff --git a/internal/store/config_group_etcd.go b/internal/store/config_group_etcd.go index 9a17e58..33141d8 100644 --- a/internal/store/config_group_etcd.go +++ b/internal/store/config_group_etcd.go @@ -8,6 +8,7 @@ import ( "github.com/c12s/kuiper/internal/domain" clientv3 "go.etcd.io/etcd/client/v3" + "go.opentelemetry.io/otel" ) type ConfigGroupEtcdStore struct { @@ -21,6 +22,10 @@ func NewConfigGroupEtcdStore(client *clientv3.Client) domain.ConfigGroupStore { } func (s ConfigGroupEtcdStore) Put(ctx context.Context, config *domain.ConfigGroup) *domain.Error { + tracer := otel.Tracer("kuiper.ConfigGroupEtcdStore") + ctx, span := tracer.Start(ctx, "Put") + defer span.End() + dao := ConfigGroupDAO{ Org: string(config.Org()), Namespace: config.Namespace(), @@ -56,6 +61,10 @@ func (s ConfigGroupEtcdStore) Put(ctx context.Context, config *domain.ConfigGrou } func (s ConfigGroupEtcdStore) Get(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.ConfigGroup, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupEtcdStore") + ctx, span := tracer.Start(ctx, "Get") + defer span.End() + key := ConfigGroupDAO{ Org: string(org), Namespace: namespace, @@ -85,6 +94,10 @@ func (s ConfigGroupEtcdStore) Get(ctx context.Context, org domain.Org, namespace } func (s ConfigGroupEtcdStore) List(ctx context.Context, org domain.Org, namespace string) ([]*domain.ConfigGroup, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupEtcdStore") + ctx, span := tracer.Start(ctx, "List") + defer span.End() + key := ConfigGroupDAO{ Org: string(org), Namespace: namespace, @@ -114,6 +127,10 @@ func (s ConfigGroupEtcdStore) List(ctx context.Context, org domain.Org, namespac } func (s ConfigGroupEtcdStore) Delete(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.ConfigGroup, *domain.Error) { + tracer := otel.Tracer("kuiper.ConfigGroupEtcdStore") + ctx, span := tracer.Start(ctx, "Delete") + defer span.End() + key := ConfigGroupDAO{ Org: string(org), Namespace: namespace, diff --git a/internal/store/placement_etcd.go b/internal/store/placement_etcd.go index b6d7357..dc7bcd7 100644 --- a/internal/store/placement_etcd.go +++ b/internal/store/placement_etcd.go @@ -9,6 +9,7 @@ import ( "github.com/c12s/kuiper/internal/domain" clientv3 "go.etcd.io/etcd/client/v3" + "go.opentelemetry.io/otel" ) type PlacementEtcdStore struct { @@ -22,6 +23,10 @@ func NewPlacementEtcdStore(client *clientv3.Client) domain.PlacementStore { } func (s PlacementEtcdStore) Place(ctx context.Context, config domain.Config, req *domain.PlacementTask) *domain.Error { + tracer := otel.Tracer("kuiper.PlacementEtcdStore") + ctx, span := tracer.Start(ctx, "Place") + defer span.End() + dao := PlacementTaskDAO{ Id: req.Id(), Org: string(config.Org()), @@ -48,6 +53,10 @@ func (s PlacementEtcdStore) Place(ctx context.Context, config domain.Config, req } func (s PlacementEtcdStore) ListByConfig(ctx context.Context, org domain.Org, namespace, name string, version, configType string) ([]domain.PlacementTask, *domain.Error) { + tracer := otel.Tracer("kuiper.PlacementEtcdStore") + ctx, span := tracer.Start(ctx, "ListByConfig") + defer span.End() + key := PlacementTaskDAO{ Org: string(org), Namespace: namespace, @@ -73,6 +82,10 @@ func (s PlacementEtcdStore) ListByConfig(ctx context.Context, org domain.Org, na } func (s PlacementEtcdStore) UpdateStatus(ctx context.Context, org domain.Org, namespace, name string, version string, configType string, taskId string, status domain.PlacementTaskStatus) *domain.Error { + tracer := otel.Tracer("kuiper.PlacementEtcdStore") + ctx, span := tracer.Start(ctx, "UpdateStatus") + defer span.End() + key := PlacementTaskDAO{ Id: taskId, Org: string(org), diff --git a/internal/store/standalone_config_etcd.go b/internal/store/standalone_config_etcd.go index 775464d..b5f3783 100644 --- a/internal/store/standalone_config_etcd.go +++ b/internal/store/standalone_config_etcd.go @@ -8,6 +8,7 @@ import ( "github.com/c12s/kuiper/internal/domain" clientv3 "go.etcd.io/etcd/client/v3" + "go.opentelemetry.io/otel" ) type StandaloneConfigEtcdStore struct { @@ -21,6 +22,10 @@ func NewStandaloneConfigEtcdStore(client *clientv3.Client) domain.StandaloneConf } func (s StandaloneConfigEtcdStore) Put(ctx context.Context, config *domain.StandaloneConfig) *domain.Error { + tracer := otel.Tracer("kuiper.StandaloneConfigEtcdStore") + ctx, span := tracer.Start(ctx, "Put") + defer span.End() + dao := StandaloneConfigDAO{ Org: string(config.Org()), Namespace: config.Namespace(), @@ -47,6 +52,10 @@ func (s StandaloneConfigEtcdStore) Put(ctx context.Context, config *domain.Stand } func (s StandaloneConfigEtcdStore) Get(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.StandaloneConfig, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigEtcdStore") + ctx, span := tracer.Start(ctx, "Get") + defer span.End() + key := StandaloneConfigDAO{ Org: string(org), Namespace: namespace, @@ -72,6 +81,10 @@ func (s StandaloneConfigEtcdStore) Get(ctx context.Context, org domain.Org, name } func (s StandaloneConfigEtcdStore) List(ctx context.Context, org domain.Org, namespace string) ([]*domain.StandaloneConfig, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigEtcdStore") + ctx, span := tracer.Start(ctx, "List") + defer span.End() + key := StandaloneConfigDAO{ Org: string(org), Namespace: namespace, @@ -96,6 +109,10 @@ func (s StandaloneConfigEtcdStore) List(ctx context.Context, org domain.Org, nam } func (s StandaloneConfigEtcdStore) Delete(ctx context.Context, org domain.Org, namespace, name, version string) (*domain.StandaloneConfig, *domain.Error) { + tracer := otel.Tracer("kuiper.StandaloneConfigEtcdStore") + ctx, span := tracer.Start(ctx, "Delete") + defer span.End() + key := StandaloneConfigDAO{ Org: string(org), Namespace: namespace, diff --git a/pkg/api/kuiper_async.go b/pkg/api/kuiper_async.go index ecde015..f21be6c 100755 --- a/pkg/api/kuiper_async.go +++ b/pkg/api/kuiper_async.go @@ -1,12 +1,14 @@ package api import ( + context "context" "fmt" "log" "github.com/c12s/magnetar/pkg/messaging" "github.com/c12s/magnetar/pkg/messaging/nats" natsgo "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel" "google.golang.org/protobuf/proto" ) @@ -35,71 +37,80 @@ func NewKuiperAsyncClient(address, nodeId string) (*KuiperAsyncClient, error) { } func (c *KuiperAsyncClient) ReceiveConfig(standaloneHandler PutStandaloneConfigHandler, groupHandler PutConfigGroupHandler) error { - err := c.subscriber.Subscribe(func(msg []byte, replySubject string) { + return c.subscriber.Subscribe(func(ctx context.Context, msg []byte, replySubject string) { + tracer := otel.Tracer("kuiper.async") + ctx, span := tracer.Start(ctx, "ReceiveConfig") + defer span.End() + cmd := &ApplyConfigCommand{} err := proto.Unmarshal(msg, cmd) if err != nil { + span.RecordError(err) log.Println(err) return } + switch cmd.Type { case "standalone": config := &StandaloneConfig{} err := proto.Unmarshal(cmd.Config, config) if err != nil { + span.RecordError(err) log.Println(err) return } - err = standaloneHandler(config, cmd.Namespace, cmd.Strategy) - reply := &ApplyConfigReply{ - Cmd: cmd, - } + err = standaloneHandler(ctx, config, cmd.Namespace, cmd.Strategy) + reply := &ApplyConfigReply{Cmd: cmd} if err != nil { - log.Println(err) + span.RecordError(err) reply.Status = TaskStatus_Failed } else { reply.Status = TaskStatus_Placed } msg, err := proto.Marshal(reply) if err != nil { + span.RecordError(err) log.Println(err) return } - err = c.publisher.Publish(msg, replySubject) + err = c.publisher.Publish(ctx, msg, replySubject) if err != nil { + span.RecordError(err) log.Println(err) } + case "group": config := &ConfigGroup{} err := proto.Unmarshal(cmd.Config, config) if err != nil { + span.RecordError(err) log.Println(err) return } - err = groupHandler(config, cmd.Namespace, cmd.Strategy) - reply := &ApplyConfigReply{ - Cmd: cmd, - } + err = groupHandler(ctx, config, cmd.Namespace, cmd.Strategy) + reply := &ApplyConfigReply{Cmd: cmd} if err != nil { - log.Println(err) + span.RecordError(err) reply.Status = TaskStatus_Failed } else { reply.Status = TaskStatus_Placed } msg, err := proto.Marshal(reply) if err != nil { + span.RecordError(err) log.Println(err) return } - err = c.publisher.Publish(msg, replySubject) + err = c.publisher.Publish(ctx, msg, replySubject) if err != nil { + span.RecordError(err) log.Println(err) } + default: log.Printf("unknown cmd type %s", cmd.Type) } }) - return err } func (c *KuiperAsyncClient) GracefulStop() { @@ -109,8 +120,8 @@ func (c *KuiperAsyncClient) GracefulStop() { } } -type PutStandaloneConfigHandler func(config *StandaloneConfig, namespace, strategy string) error -type PutConfigGroupHandler func(config *ConfigGroup, namespace, strategy string) error +type PutStandaloneConfigHandler func(ctx context.Context, config *StandaloneConfig, namespace, strategy string) error +type PutConfigGroupHandler func(ctx context.Context, config *ConfigGroup, namespace, strategy string) error func Subject(nodeId string) string { return fmt.Sprintf("%s.configs", nodeId)