diff --git a/golang/bin/run.sh b/golang/bin/run.sh index c0d37d0..7bb3d0c 100644 --- a/golang/bin/run.sh +++ b/golang/bin/run.sh @@ -21,4 +21,4 @@ cd ../common && mvn -Prelease -DskipTests clean package -U # set env for mqadmin (use source to set linux env variables in current shell) cd ../rocketmq-admintools && source bin/env.sh # run go e2e test case with latest client version -cd ../golang && go get -u github.com/apache/rocketmq-clients/golang && go test ./mqgotest/... -timeout 2m -v +cd ../golang && go get -u github.com/apache/rocketmq-clients/golang && go test ./mqgotest/... -timeout 2m -v && go test ./pkg/... -timeout 2m -v diff --git a/golang/go.mod b/golang/go.mod index 8196eff..1e6b5b5 100644 --- a/golang/go.mod +++ b/golang/go.mod @@ -15,13 +15,13 @@ require ( github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.14.0 // indirect + github.com/go-playground/validator/v10 v10.14.1 // indirect github.com/golang/glog v1.1.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -30,14 +30,16 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/crypto v0.9.0 // indirect - golang.org/x/net v0.10.0 // indirect - golang.org/x/sync v0.2.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect - google.golang.org/api v0.124.0 // indirect - google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect - google.golang.org/grpc v1.55.0 // indirect - google.golang.org/protobuf v1.30.0 // indirect + golang.org/x/crypto v0.10.0 // indirect + golang.org/x/net v0.11.0 // indirect + golang.org/x/sync v0.3.0 // indirect + golang.org/x/sys v0.9.0 // indirect + golang.org/x/text v0.10.0 // indirect + google.golang.org/api v0.129.0 // indirect + google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect + google.golang.org/grpc v1.56.1 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/golang/go.sum b/golang/go.sum index eda5a6a..3fbff5f 100644 --- a/golang/go.sum +++ b/golang/go.sum @@ -60,6 +60,8 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k= +github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.1.1 h1:jxpi2eWoU84wbX9iIEyAeeoac3FLuifZpY9tcNUD9kw= github.com/golang/glog v1.1.1/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= @@ -117,6 +119,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.14.6/go.mod h1:zdiPV4Yse/1gnckTHtghG4G github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2 h1:gDLXvp5S9izjldquuoAhDzccbskOL6tDC5jMSyx3zxE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.2/go.mod h1:7pdNwVWBBHGiCxa9lAszqCJMbfTISJ7oMftp8+UGV08= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -172,6 +176,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= +golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -225,6 +231,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.11.0 h1:Gi2tvZIJyBtO9SDr1q9h5hEQCp/4L2RQ+ar0qjx2oNU= +golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -240,6 +248,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI= golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -266,6 +276,8 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= +golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -274,6 +286,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58= +golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -327,6 +341,14 @@ google.golang.org/api v0.123.0 h1:yHVU//vA+qkOhm4reEC9LtzHVUCN/IqqNRl1iQ9xE20= google.golang.org/api v0.123.0/go.mod h1:gcitW0lvnyWjSp9nKxAbdHKIZ6vF4aajGueeslZOyms= google.golang.org/api v0.124.0 h1:dP6Ef1VgOGqQ8eiv4GiY8RhmeyqzovcXBYPDUYG8Syo= google.golang.org/api v0.124.0/go.mod h1:xu2HQurE5gi/3t1aFCvhPD781p0a3p11sdunTJ2BlP4= +google.golang.org/api v0.125.0 h1:7xGvEY4fyWbhWMHf3R2/4w7L4fXyfpRGE9g6lp8+DCk= +google.golang.org/api v0.125.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw= +google.golang.org/api v0.126.0 h1:q4GJq+cAdMAC7XP7njvQ4tvohGLiSlytuL4BQxbIZ+o= +google.golang.org/api v0.126.0/go.mod h1:mBwVAtz+87bEN6CbA1GtZPDOqY2R5ONPqJeIlvyo4Aw= +google.golang.org/api v0.127.0 h1:v7rj0vA0imM3Ou81k1eyFxQNScLzn71EyGnJDr+V/XI= +google.golang.org/api v0.127.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750= +google.golang.org/api v0.129.0 h1:2XbdjjNfFPXQyufzQVwPf1RRnHH8Den2pfNE2jw7L8w= +google.golang.org/api v0.129.0/go.mod h1:dFjiXlanKwWE3612X97llhsoI36FAoIiRj3aTl5b/zE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -355,6 +377,28 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY google.golang.org/genproto v0.0.0-20200527145253-8367513e4ece/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/genproto v0.0.0-20230526015343-6ee61e4f9d5f h1:DwRdHa3+SynqBR2tx3LVtzJrGooL9hg1OCAfBdQAk1A= +google.golang.org/genproto v0.0.0-20230526015343-6ee61e4f9d5f/go.mod h1:9ExIQyXL5hZrHzQceCwuSYwZZ5QZBazOcprJ5rgs3lY= +google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e h1:Ao9GzfUMPH3zjVfzXG5rlWlk+Q8MXWKwWpwVQE1MXfw= +google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc h1:8DyZCyvI8mE1IdLy/60bS+52xfymkE72wv1asokgtao= +google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:xZnkP7mREFX5MORlOPEzLMr+90PPZQ2QWzrVTWfAq64= +google.golang.org/genproto/googleapis/api v0.0.0-20230526015343-6ee61e4f9d5f h1:dJhNU2ZodW2tHjMhmDOrcRSahqR0wgfOEBs8nSmVx5Y= +google.golang.org/genproto/googleapis/api v0.0.0-20230526015343-6ee61e4f9d5f/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e h1:AZX1ra8YbFMSb7+1pI8S9v4rrgRR7jU1FmuFSSjTVcQ= +google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc h1:kVKPf/IiYSBWEWtkIn6wZXwWGCnLKcC8oWfZvXjsGnM= +google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 h1:s5YSX+ZH5b5vS9rnpGymvIyMpLRJizowqDlOuyjXnTk= +google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526015343-6ee61e4f9d5f h1:QNVuVEP2S7NNxLdNdOq0RiW3c9pW4gIpUUd+GAOjk1Y= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526015343-6ee61e4f9d5f/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc h1:XSJ8Vk1SWuNr8S18z1NZSziL0CPIXLCCMDOEFtHBOFc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 h1:DEH99RbiLZhMxrpEJCZ0A+wdTe0EOgou/poSLx9vWf4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -368,6 +412,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= +google.golang.org/grpc v1.56.1 h1:z0dNfjIl0VpaZ9iSVjA6daGatAYwPGstTjt5vkRMFkQ= +google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -382,6 +428,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/golang/mqgotest/delay/delaymsg_test.go b/golang/mqgotest/delay/delaymsg_test.go index 6ae5d8c..547ee89 100644 --- a/golang/mqgotest/delay/delaymsg_test.go +++ b/golang/mqgotest/delay/delaymsg_test.go @@ -65,7 +65,7 @@ func TestDelayMsg(t *testing.T) { }() go func() { for i := 0; i < msgCount; i++ { - var msg = BuildDelayMessage(testTopic, "test", msgtag, time.Duration(delaySeconds), keys) + msg := BuildDelayMessage(testTopic, "test", msgtag, time.Duration(delaySeconds), keys) SendMessage(producer, msg, sendMsgCollector) } }() @@ -80,10 +80,6 @@ func TestDelayMsgAsync(t *testing.T) { wg sync.WaitGroup recvMsgCollector *RecvMsgsCollector sendMsgCollector = NewSendMsgsCollector() - // maximum number of messages received at one time - maxMessageNum int32 = 32 - // invisibleDuration should > 20s - invisibleDuration = time.Second * 20 // receive messages in a loop testTopic = GetTopicName() nameServer = NAMESERVER @@ -94,7 +90,6 @@ func TestDelayMsgAsync(t *testing.T) { cm = GetGroupName() msgtag = RandomString(8) keys = RandomString(8) - msgCount = 10 delaySeconds = 2 ) @@ -110,12 +105,12 @@ func TestDelayMsgAsync(t *testing.T) { wg.Add(1) go func() { - recvMsgCollector = RecvMessage(simpleConsumer, maxMessageNum, invisibleDuration, int64(20+delaySeconds)) + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, int64(20+delaySeconds)) wg.Done() }() go func() { - for i := 0; i < msgCount; i++ { - var msg = BuildDelayMessage(testTopic, "test", msgtag, time.Duration(delaySeconds), keys) + for i := 0; i < MsgCount; i++ { + msg := BuildDelayMessage(testTopic, "test", msgtag, time.Duration(delaySeconds), keys) SendMessageAsync(producer, msg, sendMsgCollector) } }() diff --git a/golang/mqgotest/fifo/ordermsg_test.go b/golang/mqgotest/fifo/ordermsg_test.go index 8f08d18..a7a3d8b 100644 --- a/golang/mqgotest/fifo/ordermsg_test.go +++ b/golang/mqgotest/fifo/ordermsg_test.go @@ -20,7 +20,6 @@ import ( . "rocketmq-go-e2e/utils" "sync" "testing" - "time" ) func TestFiFOMsg(t *testing.T) { @@ -29,10 +28,6 @@ func TestFiFOMsg(t *testing.T) { wg sync.WaitGroup recvMsgCollector *RecvMsgsCollector sendMsgCollector = NewSendMsgsCollector() - // maximum number of messages received at one time - maxMessageNum int32 = 32 - // invisibleDuration should > 20s - invisibleDuration = time.Second * 20 // receive messages in a loop testTopic = GetTopicName() nameServer = NAMESERVER @@ -60,13 +55,13 @@ func TestFiFOMsg(t *testing.T) { wg.Add(1) go func() { - recvMsgCollector = RecvMessage(simpleConsumer, maxMessageNum, invisibleDuration, 60) + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 60) wg.Done() }() go func() { for i := 0; i < msgCount; i++ { - var msg = BuildFIFOMessage(testTopic, "test", msgtag, cm, keys) + msg := BuildFIFOMessage(testTopic, "test", msgtag, cm, keys) SendMessage(producer, msg, sendMsgCollector) } }() diff --git a/golang/pkg/client/consumer/simple_consumer_init_test.go b/golang/pkg/client/consumer/simple_consumer_init_test.go new file mode 100644 index 0000000..b1c4b1e --- /dev/null +++ b/golang/pkg/client/consumer/simple_consumer_init_test.go @@ -0,0 +1,68 @@ +package consumer + +import ( + . "rocketmq-go-e2e/utils" + "testing" + "time" + + rmq_client "github.com/apache/rocketmq-clients/golang" + "github.com/apache/rocketmq-clients/golang/credentials" +) + +func TestSimpleConsumerInit(t *testing.T) { + type args struct { + name, testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + name: "SimpleConsumer all parameters are set properly, expect start success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{ + Endpoint: tt.args.grpcEndpoint, + ConsumerGroup: tt.args.cm, + Credentials: &credentials.SessionCredentials{ + AccessKey: tt.args.ak, + AccessSecret: tt.args.sk, + }, + }, + rmq_client.WithAwaitDuration(time.Second*10), + rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{ + tt.args.testTopic: rmq_client.NewFilterExpression("*"), + }), + ) + if err != nil { + t.Fail() + t.Log("Error: ", err) + } + + defer simpleConsumer.GracefulStop() + + // start simpleConsumer + err = simpleConsumer.Start() + if err != nil { + t.Fail() + t.Log("Error: ", err) + } + }) + } +} diff --git a/golang/pkg/message/message_body_content_test.go b/golang/pkg/message/message_body_content_test.go new file mode 100644 index 0000000..b4ba783 --- /dev/null +++ b/golang/pkg/message/message_body_content_test.go @@ -0,0 +1,93 @@ +package rocketmqtest + +import ( + . "rocketmq-go-e2e/utils" + "sync" + "testing" +) + +func TestMessageBodyContent(t *testing.T) { + type args struct { + name, testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + name: "Send normal message, setting message body with space character, expect consume success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: " ", + }, + }, + { + name: "Send normal message, setting message body with chinese character, expect consume success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: "中文字符", + }, + }, + { + name: "Send normal message, setting message body with emoji(😱) character, expect consume success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: "😱", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ( + wg sync.WaitGroup + recvMsgCollector *RecvMsgsCollector + sendMsgCollector *SendMsgsCollector + ) + + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, tt.args.body, tt.args.msgtag, MsgCount, tt.args.keys) + }() + wg.Wait() + + CheckMsgsWithAll(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/pkg/message/message_key_test.go b/golang/pkg/message/message_key_test.go new file mode 100644 index 0000000..0d65345 --- /dev/null +++ b/golang/pkg/message/message_key_test.go @@ -0,0 +1,165 @@ +package rocketmqtest + +import ( + "context" + . "rocketmq-go-e2e/utils" + "sync" + "testing" +) + +func TestMessageKeySize(t *testing.T) { + type args struct { + name, testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + name: "Message Key equals 16KB, expect send success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(8), + keys: RandomString(16 * 1024), + body: RandomString(64), + }, + }, + { + name: "Message Key beyond 16KB, expect send failed", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(8), + keys: RandomString(16*1024 + 1), + body: RandomString(64), + }, + }, + { + name: "Message Key contains invisible characters \u0000 , expect send failed", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(8), + keys: "\u0000", + body: RandomString(64), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + msg := BuildNormalMessage(tt.args.testTopic, tt.args.body, tt.args.msgtag, tt.args.keys) + + _, err := producer.Send(context.TODO(), msg) + if err != nil { + t.Fail() + t.Log("Error: ", err) + } + }) + } +} + +func TestMessageKeyContent(t *testing.T) { + type args struct { + testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: "中文字符", + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run("Message key contains Chinese, expect send and consume success", func(t *testing.T) { + var wg sync.WaitGroup + + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + var sendMsgCollector *SendMsgsCollector + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, tt.args.body, tt.args.msgtag, MsgCount, tt.args.keys) + }() + wg.Wait() + + CheckMsgsWithAll(t, sendMsgCollector, recvMsgCollector) + }) + t.Run("The message contains multiple keys, expect send and consume success", func(t *testing.T) { + var wg sync.WaitGroup + var k1 = RandomString(64) + + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + var sendMsgCollector *SendMsgsCollector + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, tt.args.body, tt.args.msgtag, MsgCount, tt.args.keys, k1) + }() + wg.Wait() + + CheckMsgsWithAll(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/pkg/message/message_size_test.go b/golang/pkg/message/message_size_test.go new file mode 100644 index 0000000..eea9a64 --- /dev/null +++ b/golang/pkg/message/message_size_test.go @@ -0,0 +1,127 @@ +package rocketmqtest + +import ( + "context" + . "rocketmq-go-e2e/utils" + "testing" + "time" +) + +// todo Golang中不能发送4M的消息,初步认为是版本问题,后期将继续跟进 + +func TestNormalMessageSize(t *testing.T) { + type args struct { + name, testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + name: "Send normal messages synchronously with the body size of 4M+1, expect send failed", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(4*1024*1024 + 1), + }, + }, + { + name: "Send normal messages synchronously with the body size of 4M, expect send success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(4 * 1024 * 1024), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + msg := BuildNormalMessage(tt.args.testTopic, tt.args.body, tt.args.msgtag, tt.args.keys) + + _, err := producer.Send(context.TODO(), msg) + if err != nil { + t.Fail() + t.Log("Error: ", err) + } + }) + } +} + +func TestDelayMessageSize(t *testing.T) { + type args struct { + name, testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + deliveryTimestamp int + } + tests := []struct { + name string + args args + }{ + { + name: "Send delay messages synchronously with the body size of 4M+1, expect send failed", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(4*1024*1024 + 1), + deliveryTimestamp: 10, + }, + }, + { + name: "Send delay messages synchronously with the body size of 4M, expect send success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(4 * 1024 * 1024), + deliveryTimestamp: 10, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + CreateDelayTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + msg := BuildDelayMessage(tt.args.testTopic, tt.args.body, tt.args.msgtag, time.Duration(tt.args.deliveryTimestamp), tt.args.keys) + + _, err := producer.Send(context.TODO(), msg) + if err != nil { + t.Fail() + t.Log("Error: ", err) + } + }) + } +} diff --git a/golang/pkg/message/message_tag_test.go b/golang/pkg/message/message_tag_test.go new file mode 100644 index 0000000..049be10 --- /dev/null +++ b/golang/pkg/message/message_tag_test.go @@ -0,0 +1,149 @@ +package rocketmqtest + +import ( + "context" + . "rocketmq-go-e2e/utils" + "sync" + "testing" +) + +func TestMessageTagSizeAndSpecialCharacter(t *testing.T) { + type args struct { + name, testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + name: "Message Tag beyond 16KB,, expect send failed", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(16*1024 + 1), + keys: RandomString(8), + body: "test", + }, + }, + { + name: "Message Tag equals 16KB, expect send success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: RandomString(64 * 1024), + keys: RandomString(64), + body: RandomString(64), + }, + }, + { + name: "Message Tag contains invisible characters \u0000 , expect send failed", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: "\u0000", + keys: RandomString(64), + body: RandomString(64), + }, + }, + { + name: "Message Tag contains |, expect send failed", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + msgtag: "tag|", + keys: RandomString(64), + body: RandomString(64), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + msg := BuildNormalMessage(tt.args.testTopic, tt.args.body, tt.args.msgtag, tt.args.keys) + + _, err := producer.Send(context.TODO(), msg) + if err != nil { + t.Fail() + t.Log("Error: ", err) + } + }) + } +} + +func TestMessageTagContentWithChinese(t *testing.T) { + type args struct { + name, testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + name: "Message Tag contains Chinese, expect send and consume success", + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: "中文字符", + keys: RandomString(64), + body: RandomString(64), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var wg sync.WaitGroup + + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + var sendMsgCollector *SendMsgsCollector + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, tt.args.body, tt.args.msgtag, MsgCount, tt.args.keys) + }() + wg.Wait() + + CheckMsgsWithAll(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/pkg/producer/producer_init_test.go b/golang/pkg/producer/producer_init_test.go new file mode 100644 index 0000000..67443e3 --- /dev/null +++ b/golang/pkg/producer/producer_init_test.go @@ -0,0 +1,123 @@ +package producer + +import ( + . "rocketmq-go-e2e/utils" + "sync" + "testing" + "time" + + rmq_client "github.com/apache/rocketmq-clients/golang" + "github.com/apache/rocketmq-clients/golang/credentials" +) + +func TestProducerInit(t *testing.T) { + type args struct { + testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + args args + }{ + { + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run("Producer is normally set,expected success", func(t *testing.T) { + var wg sync.WaitGroup + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + var sendMsgCollector *SendMsgsCollector + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, "test", tt.args.msgtag, 10, tt.args.keys) + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + t.Run("The NAMESERVER_ADDR setting of the Producer failed, expect failed", func(t *testing.T) { + var wg sync.WaitGroup + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{ + Endpoint: "https://www.aliyun.com", + ConsumerGroup: tt.args.cm, + Credentials: &credentials.SessionCredentials{ + AccessKey: tt.args.ak, + AccessSecret: tt.args.sk, + }, + }, + rmq_client.WithAwaitDuration(time.Second*10), + rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{ + tt.args.testTopic: rmq_client.NewFilterExpression(tt.args.msgtag), + }), + ) + if err != nil { + t.Fail() + t.Log("Error: ", err) + return + } + // start simpleConsumer + err = simpleConsumer.Start() + if err != nil { + t.Fail() + t.Log("Error: ", err) + } + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + var sendMsgCollector *SendMsgsCollector + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + + }() + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, "test", tt.args.msgtag, 10, tt.args.keys) + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/pkg/server/delay/delaymsg_test.go b/golang/pkg/server/delay/delaymsg_test.go new file mode 100644 index 0000000..80ecc40 --- /dev/null +++ b/golang/pkg/server/delay/delaymsg_test.go @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package delay_test + +import ( + . "rocketmq-go-e2e/utils" + "sync" + "testing" + "time" +) + +func TestDelaySendMessage(t *testing.T) { + type args struct { + testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + args args + }{ + { + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run("Send 10 messages, set the message delivery time to 30 seconds after the current system time, and expect to consume all 10 messages after 30 seconds.", func(t *testing.T) { + var ( + wg sync.WaitGroup + msgCount = 10 + delaySeconds = 30 + ) + CreateDelayTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + sendMsgCollector := NewSendMsgsCollector() + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, int64(10+delaySeconds)) + wg.Done() + }() + go func() { + for i := 0; i < msgCount; i++ { + msg := BuildDelayMessage(tt.args.testTopic, tt.args.body, tt.args.msgtag, time.Duration(delaySeconds), tt.args.keys) + SendMessage(producer, msg, sendMsgCollector) + } + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + t.Run("Send 10 messages, set the message delivery time to 10 seconds before the current system time, the expected timing will not take effect, and all 10 messages can be consumed immediately.", func(t *testing.T) { + var ( + wg sync.WaitGroup + msgCount = 10 + delaySeconds = 30 + ) + CreateDelayTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + sendMsgCollector := NewSendMsgsCollector() + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, int64(10+delaySeconds)) + wg.Done() + }() + go func() { + for i := 0; i < msgCount; i++ { + duration, _ := time.ParseDuration("-10s") // 定义时间差量为10秒前 + timeBefore10s := time.Now().Add(duration) + elapsedTime := time.Since(timeBefore10s) + msg := BuildDelayMessage(tt.args.testTopic, "test", tt.args.msgtag, elapsedTime, tt.args.keys) + SendMessage(producer, msg, sendMsgCollector) + } + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + t.Run("Send a message and set the delivery time of the message to 24 hours and 5 seconds from the current system time. The expected message delivery fails.", func(t *testing.T) { + var ( + wg sync.WaitGroup + msgCount = 10 + delaySeconds = 30 + ) + CreateDelayTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + sendMsgCollector := NewSendMsgsCollector() + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, int64(10+delaySeconds)) + wg.Done() + }() + go func() { + for i := 0; i < msgCount; i++ { + duration, _ := time.ParseDuration("24h5s") // 定义时间差量为24小时零5秒后 + msg := BuildDelayMessage(tt.args.testTopic, "test", tt.args.msgtag, duration, tt.args.keys) + SendMessage(producer, msg, sendMsgCollector) + } + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + //todo 添加运行失败,但是不报错的逻辑 + }) + } +} diff --git a/golang/pkg/server/normal/normalmsg_test.go b/golang/pkg/server/normal/normalmsg_test.go new file mode 100644 index 0000000..3adbad0 --- /dev/null +++ b/golang/pkg/server/normal/normalmsg_test.go @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package normal_test + +import ( + . "rocketmq-go-e2e/utils" + "sync" + "testing" +) + +func TestMessageSyncAndASyncTransfer(t *testing.T) { + type args struct { + testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + args args + }{ + { + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run("The synchronous test message is sent normally, expecting success", func(t *testing.T) { + var wg sync.WaitGroup + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + var sendMsgCollector *SendMsgsCollector + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, "test", tt.args.msgtag, 10, tt.args.keys) + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + t.Run("The asynchronous test message is sent normally, expecting success", func(t *testing.T) { + var wg sync.WaitGroup + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + msg := BuildNormalMessage(tt.args.testTopic, tt.args.body, tt.args.msgtag, tt.args.keys) + + var recvMsgCollector *RecvMsgsCollector + sendMsgCollector := NewSendMsgsCollector() + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + SendMessageAsync(producer, msg, sendMsgCollector) + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/pkg/server/order/ordermsg_test.go b/golang/pkg/server/order/ordermsg_test.go new file mode 100644 index 0000000..b830151 --- /dev/null +++ b/golang/pkg/server/order/ordermsg_test.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package order_test + +import ( + . "rocketmq-go-e2e/utils" + "sync" + "testing" +) + +func TestOrderSendOrderly(t *testing.T) { + type args struct { + testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + name string + args args + }{ + { + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run("Send 100 sequential messages synchronously, set 2 Messagegroups, and expect these 100 messages to be sequentially consumed by PushConsumer", func(t *testing.T) { + var wg sync.WaitGroup + CreateFIFOTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + var recvMsgCollector *RecvMsgsCollector + sendMsgCollector := NewSendMsgsCollector() + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + for i := 0; i <= 100; i++ { + msg := BuildFIFOMessage(tt.args.testTopic, tt.args.body, tt.args.msgtag, tt.args.cm, tt.args.keys) + SendMessage(producer, msg, sendMsgCollector) + } + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/pkg/server/transaction/transmsg_test.go b/golang/pkg/server/transaction/transmsg_test.go new file mode 100644 index 0000000..0b15036 --- /dev/null +++ b/golang/pkg/server/transaction/transmsg_test.go @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package transaction_test + +import ( + "log" + . "rocketmq-go-e2e/utils" + "sync" + "testing" + + rmq_client "github.com/apache/rocketmq-clients/golang" +) + +func TestSendTransactionMessage(t *testing.T) { + type args struct { + testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + args args + }{ + { + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run("The synchronous test message is sent normally, expecting success", func(t *testing.T) { + var ( + recvMsgCollector *RecvMsgsCollector + sendMsgCollector *SendMsgsCollector + wg sync.WaitGroup + ) + CreateTransactionTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // graceful stop simpleConsumer + defer simpleConsumer.GracefulStop() + + // new producer instance + var checker = &rmq_client.TransactionChecker{ + Check: func(msgView *rmq_client.MessageView) rmq_client.TransactionResolution { + log.Printf("check transaction message: %v", msgView) + sendMsgCollector.MsgIds = append(sendMsgCollector.MsgIds, msgView.GetMessageId()) + msg := &rmq_client.Message{ + Topic: msgView.GetTopic(), + Body: msgView.GetBody(), + Tag: msgView.GetTag(), + } + msg.SetKeys(msgView.GetKeys()...) + //msg.SetMessageGroup(*msgView.GetMessageGroup()) + //msg.SetDelayTimestamp(*msgView.GetDeliveryTimestamp()) + sendMsgCollector.SendMsgs = append(sendMsgCollector.SendMsgs, msg) + return rmq_client.COMMIT + }, + } + + // new producer instance + producer := BuildTransactionProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, checker, tt.args.testTopic) + + // graceful stop producer + defer producer.GracefulStop() + + wg.Add(1) + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 10) + wg.Done() + }() + go func() { + sendMsgCollector = SendTransactionMessage(producer, tt.args.testTopic, "test", tt.args.msgtag, 11, tt.args.keys) + }() + wg.Wait() + + CheckMsgsWithMsgId(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/pkg/simple/simple_ack_test.go b/golang/pkg/simple/simple_ack_test.go new file mode 100644 index 0000000..04c6b16 --- /dev/null +++ b/golang/pkg/simple/simple_ack_test.go @@ -0,0 +1,89 @@ +package simple + +import ( + . "rocketmq-go-e2e/utils" + "sync" + "testing" +) + +func TestMessageBodyContent(t *testing.T) { + type args struct { + testTopic, nameServer, grpcEndpoint, clusterName, ak, sk, cm, msgtag, keys, body string + } + tests := []struct { + args args + }{ + { + args: args{ + testTopic: GetTopicName(), + nameServer: NAMESERVER, + grpcEndpoint: GRPC_ENDPOINT, + clusterName: CLUSTER_NAME, + ak: "", + sk: "", + cm: GetGroupName(), + msgtag: RandomString(8), + keys: RandomString(8), + body: RandomString(8), + }, + }, + } + for _, tt := range tests { + t.Run("Send 20 normal messages synchronously and expect consume with receive() and ack() messages successful", func(t *testing.T) { + var ( + wg sync.WaitGroup + recvMsgCollector *RecvMsgsCollector + sendMsgCollector *SendMsgsCollector + ) + + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + wg.Add(1) + + go func() { + sendMsgCollector = SendNormalMessage(producer, tt.args.testTopic, tt.args.body, tt.args.msgtag, 20, tt.args.keys) + }() + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 30) + wg.Done() + }() + wg.Wait() + CheckMsgsWithAll(t, sendMsgCollector, recvMsgCollector) + }) + t.Run("Send 20 normal messages synchronously. Expect consume with receiveAsync() and ack() messages successful", func(t *testing.T) { + var ( + wg sync.WaitGroup + recvMsgCollector *RecvMsgsCollector + sendMsgCollector *SendMsgsCollector + ) + + CreateTopic(tt.args.testTopic, "", tt.args.clusterName, tt.args.nameServer) + simpleConsumer := BuildSimpleConsumer(tt.args.grpcEndpoint, tt.args.cm, tt.args.msgtag, tt.args.ak, tt.args.sk, tt.args.testTopic) + + // new producer instance + producer := BuildProducer(tt.args.grpcEndpoint, tt.args.ak, tt.args.sk, tt.args.testTopic) + // graceful stop producer + defer producer.GracefulStop() + + wg.Add(1) + + go func() { + sendMsgCollector = SendNormalMessageAsync(producer, tt.args.testTopic, tt.args.body, tt.args.msgtag, 20, tt.args.keys) + }() + + go func() { + recvMsgCollector = RecvMessage(simpleConsumer, MaxMessageNum, InvisibleDuration, 30) + wg.Done() + }() + wg.Wait() + CheckMsgsWithAll(t, sendMsgCollector, recvMsgCollector) + }) + } +} diff --git a/golang/utils/CheckUtils.go b/golang/utils/CheckUtils.go index 7375491..1644448 100644 --- a/golang/utils/CheckUtils.go +++ b/golang/utils/CheckUtils.go @@ -18,12 +18,13 @@ package utils import ( - rmq_client "github.com/apache/rocketmq-clients/golang" - "github.com/stretchr/testify/assert" "math" "sort" "testing" "time" + + rmq_client "github.com/apache/rocketmq-clients/golang" + "github.com/stretchr/testify/assert" ) // check msg with msgId received only once diff --git a/golang/utils/ClientUtils.go b/golang/utils/ClientUtils.go index 93db756..7deeaea 100644 --- a/golang/utils/ClientUtils.go +++ b/golang/utils/ClientUtils.go @@ -29,12 +29,13 @@ import ( ) var ( + MsgCount = 10 // maximum waiting time for receive func awaitDuration = time.Second * 5 - // maximum number of messages received at one time - maxMessageNum int32 = 32 - // invisibleDuration should > 20s - invisibleDuration = time.Second * 20 + // MaxMessageNum maximum number of messages received at one time + MaxMessageNum int32 = 32 + // InvisibleDuration should > 20s + InvisibleDuration = time.Second * 20 // receive messages in a loop GRPC_ENDPOINT = os.Getenv("GRPC_ENDPOINT") NAMESERVER = os.Getenv("NAMESERVER")