diff --git a/.github/workflows/dataflow_engine_chaos.yaml b/.github/workflows/dataflow_engine_chaos.yaml index 20fd0cbc8c..cc35cdd6bc 100644 --- a/.github/workflows/dataflow_engine_chaos.yaml +++ b/.github/workflows/dataflow_engine_chaos.yaml @@ -21,7 +21,7 @@ jobs: base: # The type of runner that the job will run on runs-on: ubuntu-20.04 - timeout-minutes: 30 + timeout-minutes: 50 strategy: fail-fast: false matrix: @@ -55,7 +55,7 @@ jobs: key: ${{ runner.os }}-dataflow-${{ hashFiles('go.sum') }} - name: Create k8s Kind Cluster - uses: helm/kind-action@v1.2.0 + uses: helm/kind-action@v1.4.0 with: cluster_name: dataflow-engine-cluster config: ${{ github.workspace }}/engine/chaos/manifests/kind-cluster.yaml @@ -71,7 +71,9 @@ jobs: helm version - name: Build dataflow engine binary - run: make tiflow tiflow-chaos-case + run: | + make tiflow tiflow-chaos-case + cp -r $GITHUB_WORKSPACE/engine/chaos/cases/conf/ $GITHUB_WORKSPACE/bin/engine-conf - name: Build Dataflow engine docker image run: | @@ -82,6 +84,61 @@ jobs: run: | kind load docker-image dataflow:chaos --name dataflow-engine-cluster + # Set up upstream instances + - name: Set up sources + run: | + kubectl apply -f $GITHUB_WORKSPACE/dm/chaos/manifests/sources.yaml + kubectl get -f $GITHUB_WORKSPACE/dm/chaos/manifests/sources.yaml + kubectl describe -f $GITHUB_WORKSPACE/dm/chaos/manifests/sources.yaml + - name: Wait for sources ready # kubectl wait --all not working + run: | + kubectl wait --for=condition=Ready pod/mysql57-0 --timeout=300s || true + kubectl wait --for=condition=Ready pod/mysql8-0 --timeout=300s || true + kubectl wait --for=condition=Ready pod/mariadb-0 --timeout=300s || true + sleep 10 + echo show pvc + kubectl get pvc -l app=sources -o wide + echo show pv + kubectl get pv -o wide + echo show svc + kubectl get svc -l app=sources -o wide + echo show sts + kubectl get sts -l app=sources -o wide + echo show po + kubectl get po -l app=sources -o wide + echo describe po + kubectl describe po -l app=sources + echo describe pvc + kubectl describe pvc -l app=sources + kubectl wait --for=condition=Ready pod/mysql57-0 --timeout=0s + kubectl wait --for=condition=Ready pod/mysql8-0 --timeout=0s + kubectl wait --for=condition=Ready pod/mariadb-0 --timeout=0s + + # Set up downstream TiDB instance (deploy a TiDB with mockTiKV, not a TidbCluster managed by TiDB-operator) + - name: Set up TiDB + run: | + kubectl apply -f $GITHUB_WORKSPACE/dm/chaos/manifests/tidb.yaml + kubectl get -f $GITHUB_WORKSPACE/dm/chaos/manifests/tidb.yaml + kubectl describe -f $GITHUB_WORKSPACE/dm/chaos/manifests/tidb.yaml + - name: Wait for TiDB ready + run: | + kubectl wait --for=condition=Ready pod/tidb-0 --timeout=10m || true + echo show pvc + kubectl get pvc -l app=tidb -o wide + echo show pv + kubectl get pv -o wide + echo show svc + kubectl get svc -l app=tidb -o wide + echo show sts + kubectl get sts -l app=tidb -o wide + echo show po + kubectl get po -l app=tidb -o wide + echo describe po + kubectl describe po -l app=tidb + echo describe pvc + kubectl describe pvc -l app=tidb + kubectl wait --for=condition=Ready pod/tidb-0 --timeout=0s + # Set up metastore and basic services - name: Set up metastore and basic services run: | @@ -196,6 +253,17 @@ jobs: kubectl describe -f $GITHUB_WORKSPACE/engine/chaos/manifests/cases.yaml kubectl get pods + # FIXME: remove this after fix https://github.com/pingcap/tiflow/issues/7304 + - name: Wait DM enter sync stage + run: | + for idx in $(seq 0 300); do + echo "wait dm enter sync stage" + if kubectl logs job.batch/chaos-test-case | grep "full mode of the task has completed" ; then + break + fi + sleep 1 + done + - name: Encode chaos-mesh action run: | echo CFG_BASE64=$(base64 -w 0 $GITHUB_WORKSPACE/engine/chaos/manifests/${{ matrix.chaos-obj }}.yaml) >> $GITHUB_ENV @@ -204,7 +272,6 @@ jobs: uses: chaos-mesh/chaos-mesh-action@master env: CFG_BASE64: ${{ env.CFG_BASE64 }} - CHAOS_MESH_VERSION: v1.0.0 # check whether complete with 1m * 20 times. - name: Wait for chaos test case complete diff --git a/.github/workflows/dm_chaos.yaml b/.github/workflows/dm_chaos.yaml index d5284c5db5..49c7d54969 100644 --- a/.github/workflows/dm_chaos.yaml +++ b/.github/workflows/dm_chaos.yaml @@ -21,7 +21,7 @@ jobs: base: # The type of runner that the job will run on runs-on: ubuntu-20.04 - timeout-minutes: 30 + timeout-minutes: 50 strategy: fail-fast: false matrix: @@ -68,7 +68,7 @@ jobs: key: ${{ runner.os }}-ticdc-tools-${{ hashFiles('tools/check/go.sum') }} - name: Create k8s Kind Cluster - uses: helm/kind-action@v1.2.0 + uses: helm/kind-action@v1.4.0 - name: Print cluster information run: | @@ -247,7 +247,6 @@ jobs: uses: chaos-mesh/chaos-mesh-action@master env: CFG_BASE64: ${{ env.CFG_BASE64 }} - CHAOS_MESH_VERSION: v1.0.0 # check whether complete with 1m * 20 times. - name: Wait for chaos test case complete diff --git a/dm/chaos/manifests/io-chaos-dm.yaml b/dm/chaos/manifests/io-chaos-dm.yaml index 0e4b993bd1..1f5eb0382a 100644 --- a/dm/chaos/manifests/io-chaos-dm.yaml +++ b/dm/chaos/manifests/io-chaos-dm.yaml @@ -1,15 +1,20 @@ apiVersion: chaos-mesh.org/v1alpha1 -kind: IoChaos +kind: Schedule metadata: name: io-delay-dm labels: app: io-delay-dm spec: - action: latency - mode: one - selector: - pods: - default: # default namespace + schedule: '@every 2m' + type: IOChaos + historyLimit: 5 + concurrencyPolicy: Forbid + ioChaos: + action: latency + mode: one + selector: + pods: + default: - dm-master-0 - dm-master-1 - dm-master-2 @@ -17,10 +22,8 @@ spec: - dm-worker-1 - dm-worker-2 - dm-worker-3 - volumePath: /data - path: "/data/**/*" - delay: "100ms" - percent: 50 - duration: "60s" - scheduler: - cron: "@every 2m" + volumePath: /data + path: /data/**/* + delay: 100ms + percent: 50 + duration: 60s diff --git a/dm/chaos/manifests/network-emulation-dm.yaml b/dm/chaos/manifests/network-emulation-dm.yaml index d1e060d954..a63a6f1bdd 100644 --- a/dm/chaos/manifests/network-emulation-dm.yaml +++ b/dm/chaos/manifests/network-emulation-dm.yaml @@ -1,17 +1,20 @@ ---- -# A Network Loss action causes network packets to drop randomly apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos +kind: Schedule metadata: name: network-loss-dm labels: app: network-loss-dm spec: - action: loss - mode: one - selector: - pods: - default: # default namespace + schedule: 2-59/5 * * * * + type: NetworkChaos + historyLimit: 5 + concurrencyPolicy: Forbid + networkChaos: + action: loss + mode: one + selector: + pods: + default: - dm-master-0 - dm-master-1 - dm-master-2 @@ -19,92 +22,7 @@ spec: - dm-worker-1 - dm-worker-2 - dm-worker-3 - loss: - loss: "25" - correlation: "25" - duration: "30s" - scheduler: - cron: "2-59/5 * * * *" # At every 5th minute from 2 through 59, (2, 7, 12, ...) - - -# A Network Delay action causes delays in message sending ---- -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-delay-dm - labels: - app: network-delay-dm -spec: - action: delay - mode: one - selector: - pods: - default: # default namespace - - dm-master-0 - - dm-master-1 - - dm-master-2 - - dm-worker-0 - - dm-worker-1 - - dm-worker-2 - delay: - latency: "90ms" - correlation: "25" - jitter: "90ms" - duration: "30s" - scheduler: - cron: "3-59/5 * * * *" # At every 5th minute from 3 through 59, (3, 8, 13, ...) - ---- -# A Network Duplicate action causes packet duplication -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-duplicate-dm - labels: - app: network-duplicate-dm -spec: - action: duplicate - mode: one - selector: - pods: - default: # default namespace - - dm-master-0 - - dm-master-1 - - dm-master-2 - - dm-worker-0 - - dm-worker-1 - - dm-worker-2 - duplicate: - duplicate: "40" - correlation: "25" - duration: "30s" - scheduler: - cron: "4-59/5 * * * *" # At every 5th minute from 4 through 59, (4, 9, 14, ...) - ---- -# A Network Corrupt action causes packet corruption -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-corrupt-dm - labels: - app: network-corrupt-dm -spec: - action: corrupt - mode: one - selector: - pods: - default: # default namespace - - dm-master-0 - - dm-master-1 - - dm-master-2 - - dm-worker-0 - - dm-worker-1 - - dm-worker-2 - corrupt: - corrupt: "40" - correlation: "25" - duration: "30s" - scheduler: - cron: "5-59/5 * * * *" # At every 5th minute from 5 through 59, (5, 10, 15, ...) \ No newline at end of file + loss: + loss: "25" + correlation: "25" + duration: 30s diff --git a/dm/chaos/manifests/network-partition-dm.yaml b/dm/chaos/manifests/network-partition-dm.yaml index 99f7c33cf1..21f3fc6995 100644 --- a/dm/chaos/manifests/network-partition-dm.yaml +++ b/dm/chaos/manifests/network-partition-dm.yaml @@ -1,47 +1,24 @@ ---- -# network partition between DM-worker and DM-master apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos +kind: Schedule metadata: name: network-partition-dm-worker-master labels: app: network-partition-dm-worker-master spec: - action: partition - mode: one - selector: - labelSelectors: - "app": "dm-worker" - direction: both - target: - selector: - labelSelectors: - "app": "dm-master" + schedule: 2-59/3 * * * * + type: NetworkChaos + historyLimit: 5 + concurrencyPolicy: Forbid + networkChaos: + action: partition mode: one - duration: "30s" - scheduler: - cron: "2-59/3 * * * *" # At every 3rd minute from 2 through 59, (2, 5, 8, ...) - ---- -# network partition between DM-master members -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-partition-dm-master-master - labels: - app: network-partition-dm-master-master -spec: - action: partition - mode: one - selector: - labelSelectors: - "app": "dm-master" - direction: both - target: selector: labelSelectors: - "app": "dm-master" - mode: one - duration: "30s" - scheduler: - cron: "3-59/3 * * * *" # At every 3rd minute from 3 through 59, (3, 6, 9, ...) + app: dm-worker + direction: both + target: + selector: + labelSelectors: + app: dm-master + mode: one + duration: 30s diff --git a/dm/chaos/manifests/pod-failure-dm.yaml b/dm/chaos/manifests/pod-failure-dm.yaml index ee6b845c88..58b0b63dab 100644 --- a/dm/chaos/manifests/pod-failure-dm.yaml +++ b/dm/chaos/manifests/pod-failure-dm.yaml @@ -1,16 +1,21 @@ apiVersion: chaos-mesh.org/v1alpha1 -kind: PodChaos +kind: Schedule metadata: name: pod-failure-dm labels: app: pod-failure-dm spec: - action: pod-failure - mode: one - duration: "30s" - selector: - pods: - default: # default namespace + schedule: '@every 2m' + type: PodChaos + historyLimit: 5 + concurrencyPolicy: Forbid + podChaos: + action: pod-failure + mode: one + duration: 30s + selector: + pods: + default: - dm-master-0 - dm-master-1 - dm-master-2 @@ -18,5 +23,3 @@ spec: - dm-worker-1 - dm-worker-2 - dm-worker-3 - scheduler: - cron: "@every 2m" diff --git a/dm/chaos/manifests/pod-kill-dm.yaml b/dm/chaos/manifests/pod-kill-dm.yaml index b016f5f43f..959fc03bcb 100644 --- a/dm/chaos/manifests/pod-kill-dm.yaml +++ b/dm/chaos/manifests/pod-kill-dm.yaml @@ -1,16 +1,21 @@ apiVersion: chaos-mesh.org/v1alpha1 -kind: PodChaos +kind: Schedule metadata: name: pod-kill-dm labels: app: pod-kill-dm spec: - action: pod-kill - mode: one - gracePeriod: 30 - selector: - pods: - default: # default namespace + schedule: '@every 1m' + type: PodChaos + historyLimit: 5 + concurrencyPolicy: Forbid + podChaos: + action: pod-kill + mode: one + gracePeriod: 30 + selector: + pods: + default: - dm-master-0 - dm-master-1 - dm-master-2 @@ -18,5 +23,3 @@ spec: - dm-worker-1 - dm-worker-2 - dm-worker-3 - scheduler: - cron: "@every 1m" diff --git a/dm/simulator/internal/config/config.go b/dm/simulator/config/config.go similarity index 56% rename from dm/simulator/internal/config/config.go rename to dm/simulator/config/config.go index 9f8508d21f..9c5ff73b18 100644 --- a/dm/simulator/internal/config/config.go +++ b/dm/simulator/config/config.go @@ -14,6 +14,13 @@ // Package config is the configuration definitions used by the simulator. package config +import ( + "strconv" + "strings" + + "github.com/pingcap/tidb/util/dbutil" +) + // TableConfig is the sub config for describing a simulating table in the data source. type TableConfig struct { TableID string `yaml:"id"` @@ -29,3 +36,37 @@ type ColumnDefinition struct { DataType string `yaml:"type"` DataLen int `yaml:"length"` } + +func (t *TableConfig) GenCreateTable() string { + var buf strings.Builder + buf.WriteString("CREATE TABLE ") + buf.WriteString(dbutil.TableName(t.DatabaseName, t.TableName)) + buf.WriteByte('(') + for i, col := range t.Columns { + if i != 0 { + buf.WriteByte(',') + } + buf.WriteString(dbutil.ColumnName(col.ColumnName)) + buf.WriteByte(' ') + buf.WriteString(col.DataType) + if col.DataLen > 0 { + buf.WriteByte('(') + buf.WriteString(strconv.Itoa(col.DataLen)) + buf.WriteByte(')') + } + } + if len(t.UniqueKeyColumnNames) > 0 { + buf.WriteString(",UNIQUE KEY ") + buf.WriteString(dbutil.ColumnName(strings.Join(t.UniqueKeyColumnNames, "_"))) + buf.WriteByte('(') + for i, ukColName := range t.UniqueKeyColumnNames { + if i != 0 { + buf.WriteString(",") + } + buf.WriteString(dbutil.ColumnName(ukColName)) + } + buf.WriteByte(')') + } + buf.WriteByte(')') + return buf.String() +} diff --git a/dm/simulator/config/config_test.go b/dm/simulator/config/config_test.go new file mode 100644 index 0000000000..cecb3a24d0 --- /dev/null +++ b/dm/simulator/config/config_test.go @@ -0,0 +1,52 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package config is the configuration definitions used by the simulator. +package config + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestConfig(t *testing.T) { + tableConfig := &TableConfig{ + DatabaseName: "games", + TableName: "members", + Columns: []*ColumnDefinition{ + { + ColumnName: "id", + DataType: "int", + DataLen: 11, + }, + { + ColumnName: "name", + DataType: "varchar", + DataLen: 255, + }, + { + ColumnName: "age", + DataType: "int", + DataLen: 11, + }, + { + ColumnName: "team_id", + DataType: "int", + DataLen: 11, + }, + }, + UniqueKeyColumnNames: []string{"id", "name"}, + } + require.Equal(t, "CREATE TABLE `games`.`members`(`id` int(11),`name` varchar(255),`age` int(11),`team_id` int(11),UNIQUE KEY `id_name`(`id`,`name`))", tableConfig.GenCreateTable()) +} diff --git a/dm/simulator/internal/mcp/errors.go b/dm/simulator/mcp/errors.go similarity index 100% rename from dm/simulator/internal/mcp/errors.go rename to dm/simulator/mcp/errors.go diff --git a/dm/simulator/internal/mcp/mcp.go b/dm/simulator/mcp/mcp.go similarity index 100% rename from dm/simulator/internal/mcp/mcp.go rename to dm/simulator/mcp/mcp.go diff --git a/dm/simulator/internal/mcp/mcp_test.go b/dm/simulator/mcp/mcp_test.go similarity index 100% rename from dm/simulator/internal/mcp/mcp_test.go rename to dm/simulator/mcp/mcp_test.go diff --git a/dm/simulator/internal/mcp/uk.go b/dm/simulator/mcp/uk.go similarity index 91% rename from dm/simulator/internal/mcp/uk.go rename to dm/simulator/mcp/uk.go index ce7ee72ef5..73f300a149 100644 --- a/dm/simulator/internal/mcp/uk.go +++ b/dm/simulator/mcp/uk.go @@ -15,6 +15,7 @@ package mcp import ( "fmt" + "sort" "strings" "sync" ) @@ -74,6 +75,24 @@ func (uk *UniqueKey) GetValue() map[string]interface{} { return result } +// GetValueHash return hash for values. +func (uk *UniqueKey) GetValueHash() string { + uk.RLock() + defer uk.RUnlock() + + keys := make([]string, 0) + for k := range uk.value { + keys = append(keys, k) + } + sort.Strings(keys) + var b strings.Builder + for _, k := range keys { + v := uk.value[k] + fmt.Fprintf(&b, "%s = %v; ", k, v) + } + return b.String() +} + // SetValue sets the UK value map. // The input values are cloned into the UK, // and further modifications on the input map won't affect the values inside the UK. diff --git a/dm/simulator/internal/mcp/uk_test.go b/dm/simulator/mcp/uk_test.go similarity index 100% rename from dm/simulator/internal/mcp/uk_test.go rename to dm/simulator/mcp/uk_test.go diff --git a/dm/simulator/internal/sqlgen/errors.go b/dm/simulator/sqlgen/errors.go similarity index 100% rename from dm/simulator/internal/sqlgen/errors.go rename to dm/simulator/sqlgen/errors.go diff --git a/dm/simulator/internal/sqlgen/impl.go b/dm/simulator/sqlgen/impl.go similarity index 97% rename from dm/simulator/internal/sqlgen/impl.go rename to dm/simulator/sqlgen/impl.go index d37be75b73..edd9ec687b 100644 --- a/dm/simulator/internal/sqlgen/impl.go +++ b/dm/simulator/sqlgen/impl.go @@ -24,8 +24,8 @@ import ( "github.com/pingcap/tidb/parser/opcode" _ "github.com/pingcap/tidb/types/parser_driver" // import this to make the parser work "github.com/pingcap/tiflow/dm/pkg/log" - "github.com/pingcap/tiflow/dm/simulator/internal/config" - "github.com/pingcap/tiflow/dm/simulator/internal/mcp" + "github.com/pingcap/tiflow/dm/simulator/config" + "github.com/pingcap/tiflow/dm/simulator/mcp" "go.uber.org/zap" ) @@ -269,3 +269,7 @@ func (g *sqlGeneratorImpl) GenLoadUniqueKeySQL() (string, []*config.ColumnDefini } return sql, cols, nil } + +func (g *sqlGeneratorImpl) GenCreateTable() string { + return g.tableConfig.GenCreateTable() +} diff --git a/dm/simulator/internal/sqlgen/impl_test.go b/dm/simulator/sqlgen/impl_test.go similarity index 98% rename from dm/simulator/internal/sqlgen/impl_test.go rename to dm/simulator/sqlgen/impl_test.go index bad9bd7b92..da5603f57b 100644 --- a/dm/simulator/internal/sqlgen/impl_test.go +++ b/dm/simulator/sqlgen/impl_test.go @@ -21,8 +21,8 @@ import ( "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tiflow/dm/pkg/log" - "github.com/pingcap/tiflow/dm/simulator/internal/config" - "github.com/pingcap/tiflow/dm/simulator/internal/mcp" + "github.com/pingcap/tiflow/dm/simulator/config" + "github.com/pingcap/tiflow/dm/simulator/mcp" "github.com/stretchr/testify/suite" ) diff --git a/dm/simulator/internal/sqlgen/sqlgen.go b/dm/simulator/sqlgen/sqlgen.go similarity index 89% rename from dm/simulator/internal/sqlgen/sqlgen.go rename to dm/simulator/sqlgen/sqlgen.go index 68dd0dd5c5..83b4533076 100644 --- a/dm/simulator/internal/sqlgen/sqlgen.go +++ b/dm/simulator/sqlgen/sqlgen.go @@ -15,8 +15,8 @@ package sqlgen import ( - "github.com/pingcap/tiflow/dm/simulator/internal/config" - "github.com/pingcap/tiflow/dm/simulator/internal/mcp" + "github.com/pingcap/tiflow/dm/simulator/config" + "github.com/pingcap/tiflow/dm/simulator/mcp" ) // SQLGenerator contains all the operations for generating SQLs. @@ -35,4 +35,6 @@ type SQLGenerator interface { GenUpdateRow(*mcp.UniqueKey) (string, error) // GenDeleteRow generates a DELETE SQL for the given unique key. GenDeleteRow(*mcp.UniqueKey) (string, error) + // GenCreateTable generates a CreateTable SQL by table config. + GenCreateTable() string } diff --git a/engine/chaos/cases/case_dm_job.go b/engine/chaos/cases/case_dm_job.go new file mode 100644 index 0000000000..c2b9de6c11 --- /dev/null +++ b/engine/chaos/cases/case_dm_job.go @@ -0,0 +1,40 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "path/filepath" + + dmchaos "github.com/pingcap/tiflow/engine/chaos/cases/dm" + "golang.org/x/sync/errgroup" +) + +var filenames = []string{"dmjob"} + +func runDMJobCases(ctx context.Context, cfg *config) error { + eg, ctx2 := errgroup.WithContext(ctx) + for _, f := range filenames { + file := f + eg.Go(func() error { + testCase, err := dmchaos.NewCase(ctx2, cfg.Addr, file, filepath.Join(cfg.ConfigDir, file+".yaml")) + if err != nil { + return err + } + + return testCase.Run(ctx2) + }) + } + return eg.Wait() +} diff --git a/engine/chaos/cases/cases.go b/engine/chaos/cases/cases.go index fda6b006b9..dc185dc54e 100644 --- a/engine/chaos/cases/cases.go +++ b/engine/chaos/cases/cases.go @@ -21,7 +21,7 @@ import ( type caseFn func(context.Context, *config) error -var cases = []caseFn{runFakeJobCase} +var cases = []caseFn{runFakeJobCase, runDMJobCases} func runCases(ctx context.Context, cfg *config) error { errg, ctx := errgroup.WithContext(ctx) diff --git a/engine/chaos/cases/conf/dmjob.yaml b/engine/chaos/cases/conf/dmjob.yaml new file mode 100644 index 0000000000..0abe3ec605 --- /dev/null +++ b/engine/chaos/cases/conf/dmjob.yaml @@ -0,0 +1,31 @@ +task-mode: all +target-database: + host: tidb-0.tidb + port: 4000 + user: root + password: "" +upstreams: + - db-config: + host: "mysql57-0.sources" + port: 3306 + user: root + password: "" + source-id: replica-01 + block-allow-list: balist-01 + - db-config: + host: "mysql8-0.sources" + port: 3306 + user: root + password: "" + source-id: replica-02 + block-allow-list: balist-01 + - db-config: + host: "mariadb-0.sources" + port: 3306 + user: root + password: "" + source-id: replica-03 + block-allow-list: balist-01 +block-allow-list: + balist-01: + do-dbs: ["dmjob"] \ No newline at end of file diff --git a/engine/chaos/cases/config.go b/engine/chaos/cases/config.go index 9c0eb6f718..5985f20c26 100644 --- a/engine/chaos/cases/config.go +++ b/engine/chaos/cases/config.go @@ -30,6 +30,8 @@ type config struct { MasterCount int `toml:"master-count" yaml:"master-count" json:"master-count"` WorkerCount int `toml:"worker-count" yaml:"worker-count" json:"worker-count"` + + ConfigDir string `toml:"config-dir" yaml:"config-dir" json:"config-dir"` } // newConfig creates a config for this chaos testing suite. @@ -46,6 +48,7 @@ func newConfig() *config { fs.IntVar(&cfg.MasterCount, "master-count", 3, "expect count of server-master") fs.IntVar(&cfg.WorkerCount, "worker-count", 4, "expect count of executor") + fs.StringVar(&cfg.ConfigDir, "config-dir", "/", "path of the source and task config files") return cfg } diff --git a/engine/chaos/cases/dm/case.go b/engine/chaos/cases/dm/case.go new file mode 100644 index 0000000000..f4b4ea32f5 --- /dev/null +++ b/engine/chaos/cases/dm/case.go @@ -0,0 +1,414 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dm + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "os" + "time" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb-tools/pkg/diff" + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/util/dbutil" + sqlconfig "github.com/pingcap/tiflow/dm/simulator/config" + "github.com/pingcap/tiflow/dm/simulator/mcp" + sqlgen "github.com/pingcap/tiflow/dm/simulator/sqlgen" + pb "github.com/pingcap/tiflow/engine/enginepb" + "github.com/pingcap/tiflow/engine/jobmaster/dm/config" + "github.com/pingcap/tiflow/engine/test/e2e" + "github.com/pingcap/tiflow/pkg/retry" + "go.uber.org/zap" +) + +const ( + tableNum = 5 + rowNum = 1000 + batch = 100 + // 5 minutes + diffTimes = 150 + diffInterval = 2 * time.Second +) + +// Case is a data migration Case test case with one or more sources. +type Case struct { + addr string + cfgBytes []byte + sources []*dbConn + target *dbConn + tables []string + jobID string + name string + + // source -> table -> mcp + mcps []map[string]*mcp.ModificationCandidatePool + // source -> table -> generator + generators []map[string]sqlgen.SQLGenerator + // table -> key -> struct{} + keySet map[string]map[string]struct{} + + result []int +} + +// NewCase creates a new test case. +func NewCase(ctx context.Context, addr string, name string, cfgPath string) (*Case, error) { + cfgBytes, err := os.ReadFile(cfgPath) + if err != nil { + return nil, err + } + + var jobCfg config.JobCfg + if err := jobCfg.Decode(cfgBytes); err != nil { + return nil, err + } + + c := &Case{ + sources: make([]*dbConn, 0, len(jobCfg.Upstreams)), + cfgBytes: cfgBytes, + addr: addr, + name: name, + mcps: make([]map[string]*mcp.ModificationCandidatePool, 0, 3), + generators: make([]map[string]sqlgen.SQLGenerator, 0, 3), + keySet: make(map[string]map[string]struct{}, tableNum), + result: make([]int, 3), + } + for _, upstream := range jobCfg.Upstreams { + source, err := newDBConn(ctx, upstream.DBCfg, name) + if err != nil { + return nil, err + } + c.sources = append(c.sources, source) + } + target, err := newDBConn(ctx, jobCfg.TargetDB, name) + if err != nil { + return nil, err + } + c.target = target + + // init table config + for range c.sources { + generators := make(map[string]sqlgen.SQLGenerator) + mcps := make(map[string]*mcp.ModificationCandidatePool) + for i := 0; i < tableNum; i++ { + tableName := fmt.Sprintf("tb%d", i) + tableConfig := &sqlconfig.TableConfig{ + DatabaseName: c.name, + TableName: tableName, + Columns: []*sqlconfig.ColumnDefinition{ + { + ColumnName: "id", + DataType: "int", + DataLen: 11, + }, + { + ColumnName: "name", + DataType: "varchar", + DataLen: 255, + }, + { + ColumnName: "age", + DataType: "int", + DataLen: 11, + }, + { + ColumnName: "team_id", + DataType: "int", + DataLen: 11, + }, + }, + UniqueKeyColumnNames: []string{"id"}, + } + generators[tableName] = sqlgen.NewSQLGeneratorImpl(tableConfig) + mcps[tableName] = mcp.NewModificationCandidatePool(100000000) + c.keySet[tableName] = make(map[string]struct{}) + c.tables = append(c.tables, tableName) + } + c.generators = append(c.generators, generators) + c.mcps = append(c.mcps, mcps) + } + + return c, nil +} + +// Run runs a test case. +func (c *Case) Run(ctx context.Context) error { + defer func() { + log.L().Info("finish run case", zap.String("name", c.name), zap.String("job_id", c.jobID), zap.Int("insert", c.result[0]), zap.Int("update", c.result[1]), zap.Int("delete", c.result[2])) + }() + if err := c.genFullData(); err != nil { + return err + } + if err := c.createJob(ctx); err != nil { + return err + } + if err := c.diffDataLoop(ctx); err != nil { + return err + } + log.L().Info("full mode of the task has completed", zap.String("name", c.name), zap.String("job_id", c.jobID)) + return c.incrLoop(ctx) +} + +func (c *Case) createJob(ctx context.Context) error { + return retry.Do(ctx, func() error { + jobID, err := e2e.CreateJobViaHTTP(ctx, c.addr, "chaos-dm-test", "project-dm", pb.Job_DM, c.cfgBytes) + if err != nil { + log.L().Error("create job failed", zap.String("name", c.name), zap.Error(err)) + return err + } + c.jobID = jobID + return nil + }, + retry.WithBackoffBaseDelay(1000 /* 1 second */), + retry.WithBackoffMaxDelay(8000 /* 8 seconds */), + retry.WithMaxTries(15 /* fail after 103 seconds */), + ) +} + +func (c *Case) genFullData() error { + log.L().Info("start generate full data", zap.String("name", c.name), zap.String("job_id", c.jobID)) + for source, generators := range c.generators { + for table, generator := range generators { + if _, err := c.sources[source].ExecuteSQLs("CREATE DATABASE IF NOT EXISTS "+c.name+" CHARSET latin1", "USE "+c.name); err != nil { + return err + } + if _, err := c.sources[source].ExecuteSQLs(generator.GenCreateTable()); err != nil { + return err + } + sqls := make([]string, 0, rowNum) + for j := 0; j < rowNum; j++ { + sql, uk, err := generator.GenInsertRow() + if err != nil { + return err + } + // key already exists + if _, ok := c.keySet[table][uk.GetValueHash()]; ok { + continue + } + if err := c.mcps[source][table].AddUK(uk); err != nil { + return err + } + c.keySet[table][uk.GetValueHash()] = struct{}{} + sqls = append(sqls, sql) + } + if _, err := c.sources[source].ExecuteSQLs(sqls...); err != nil { + return err + } + } + } + return nil +} + +func (c *Case) diffData(ctx context.Context) (bool, error) { + log.L().Info("start diff data", zap.String("name", c.name), zap.String("job_id", c.jobID)) + for _, tableName := range c.tables { + row := c.target.db.DB.QueryRowContext(ctx, fmt.Sprintf("SELECT count(1) FROM %s", dbutil.TableName(c.target.currDB, tableName))) + if row.Err() != nil { + if row.Err() == context.DeadlineExceeded { + return false, nil + } + return false, row.Err() + } + var count int + if err := row.Scan(&count); err != nil { + return false, err + } + var totalCount int + for _, mcps := range c.mcps { + totalCount += mcps[tableName].Len() + } + if count != totalCount { + log.Error("data is not same", zap.String("name", c.name), zap.String("job_id", c.jobID), zap.Int("downstream", count), zap.Int("upstream", totalCount)) + return false, nil + } + } + return true, nil +} + +func (c *Case) diffDataLoop(ctx context.Context) error { + for i := 0; i < diffTimes; i++ { + select { + case <-ctx.Done(): + return nil + case <-time.After(diffInterval): + if same, err := c.diffData(ctx); err != nil { + if ignoreErrNoSuchTable(err) { + continue + } + return err + } else if same { + return nil + } + } + } + sourceDBs := make([]*sql.DB, 0, len(c.sources)) + for _, s := range c.sources { + sourceDBs = append(sourceDBs, s.db.DB) + } + return syncDiffInspector(ctx, c.name, c.tables, c.target.db.DB, sourceDBs...) +} + +// randDML generates DML (INSERT, UPDATE or DELETE). +func (c *Case) randDML(source int, table string) (string, error) { + generator := c.generators[source][table] + mcp := c.mcps[source][table] + t := rand.Intn(3) + key := mcp.NextUK() + // no rows + if key == nil { + t = 0 + } + c.result[t]++ + switch t { + case 0: + sql, uk, err := generator.GenInsertRow() + if err != nil { + return "", err + } + _, ok := c.keySet[table][uk.GetValueHash()] + for ok { + sql, uk, err = generator.GenInsertRow() + if err != nil { + return "", err + } + _, ok = c.keySet[table][uk.GetValueHash()] + } + if err := c.mcps[source][table].AddUK(uk); err != nil { + return "", err + } + c.keySet[table][uk.GetValueHash()] = struct{}{} + return sql, nil + case 1: + return generator.GenUpdateRow(key) + default: + sql, err := generator.GenDeleteRow(key) + if err != nil { + return "", err + } + delete(c.keySet[table], key.GetValueHash()) + err = mcp.DeleteUK(key) + return sql, err + } +} + +func (c *Case) genIncrData(ctx context.Context) error { + log.L().Info("start generate incremental data", zap.String("name", c.name), zap.String("job_id", c.jobID)) + for { + select { + case <-ctx.Done(): + return nil + default: + } + source := rand.Intn(len(c.sources)) + tableName := c.tables[rand.Intn(tableNum)] + + sqls := make([]string, 0, batch) + for i := 0; i < batch; i++ { + sql, err := c.randDML(source, tableName) + if err != nil { + return err + } + sqls = append(sqls, sql) + } + if _, err := c.sources[source].ExecuteSQLs(sqls...); err != nil { + return err + } + } +} + +func (c *Case) incrLoop(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + default: + } + ctx2, cancel := context.WithTimeout(ctx, time.Second*10) + err := c.genIncrData(ctx2) + cancel() + if err != nil { + return err + } + if err := c.diffDataLoop(ctx); err != nil { + return err + } + } +} + +func ignoreErrNoSuchTable(err error) bool { + err = errors.Cause(err) + mysqlErr, ok := err.(*mysql.MySQLError) + if !ok { + return false + } + + switch mysqlErr.Number { + case errno.ErrNoSuchTable: + return true + default: + return false + } +} + +func syncDiffInspector(ctx context.Context, schema string, tables []string, targetDB *sql.DB, sourceDBs ...*sql.DB) error { + for _, table := range tables { + sourceTables := make([]*diff.TableInstance, 0, len(sourceDBs)) + for i, sourceDB := range sourceDBs { + sourceTables = append(sourceTables, &diff.TableInstance{ + Conn: sourceDB, + Schema: schema, + Table: table, + InstanceID: fmt.Sprintf("source-%d", i), + }) + } + + targetTable := &diff.TableInstance{ + Conn: targetDB, + Schema: schema, + Table: table, + InstanceID: "target", + } + + td := &diff.TableDiff{ + SourceTables: sourceTables, + TargetTable: targetTable, + ChunkSize: 1000, + Sample: 100, + CheckThreadCount: 1, + UseChecksum: true, + TiDBStatsSource: targetTable, + CpDB: targetDB, + } + + structEqual, dataEqual, err := td.Equal(ctx, func(dml string) error { + return nil + }) + + if errors.Cause(err) == context.Canceled || errors.Cause(err) == context.DeadlineExceeded { + return nil + } + if !structEqual { + return errors.Errorf("different struct for table %s", dbutil.TableName(schema, table)) + } else if !dataEqual { + return errors.Errorf("different data for table %s", dbutil.TableName(schema, table)) + } + log.L().Info("data equal for table", zap.String("schema", schema), zap.String("table", table)) + } + + return nil +} diff --git a/engine/chaos/cases/dm/db.go b/engine/chaos/cases/dm/db.go new file mode 100644 index 0000000000..6f48b9a969 --- /dev/null +++ b/engine/chaos/cases/dm/db.go @@ -0,0 +1,106 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dm + +import ( + "context" + "fmt" + "time" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/errno" + dmconfig "github.com/pingcap/tiflow/dm/config" + "github.com/pingcap/tiflow/dm/pkg/conn" + tcontext "github.com/pingcap/tiflow/dm/pkg/context" + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/retry" + "go.uber.org/zap" +) + +type dbConn struct { + db *conn.BaseDB + con *conn.BaseConn + currDB string +} + +func newDBConn(ctx context.Context, cfg *dmconfig.DBConfig, currDB string) (*dbConn, error) { + db, err := conn.DefaultDBProvider.Apply(cfg) + if err != nil { + return nil, err + } + con, err := db.GetBaseConn(ctx) + if err != nil { + return nil, err + } + + return &dbConn{ + db: db, + con: con, + currDB: currDB, + }, nil +} + +func (dc *dbConn) resetConn(ctx context.Context) error { + err := dc.db.CloseBaseConn(dc.con) + if err != nil { + log.L().Warn("fail to close connection", zap.Error(err)) + } + dc.con, err = dc.db.GetBaseConn(ctx) + if err != nil { + return err + } + _, err = dc.con.ExecuteSQL(tcontext.NewContext(ctx, log.L()), nil, "chaos-cases", []string{fmt.Sprintf("USE %s", dc.currDB)}) + return err +} + +func ignoreExecSQLError(err error) bool { + err = errors.Cause(err) // check the original error + mysqlErr, ok := err.(*mysql.MySQLError) + if !ok { + return false + } + + switch mysqlErr.Number { + case errno.ErrDupEntry: // HACK: we tolerate `invalid connection`, then `Duplicate entry` may be reported. + return true + default: + return false + } +} + +func (dc *dbConn) ExecuteSQLs(queries ...string) (int, error) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + params := retry.Params{ + RetryCount: 3, + FirstRetryDuration: time.Second, + BackoffStrategy: retry.Stable, + IsRetryableFn: func(_ int, err error) bool { + if retry.IsConnectionError(err) { + // HACK: for some errors like `invalid connection`, `sql: connection is already closed`, we can ignore them just for testing. + err = dc.resetConn(ctx) + return err == nil + } + return false + }, + } + + ret, _, err := dc.con.ApplyRetryStrategy(tcontext.NewContext(ctx, log.L()), params, + func(tctx *tcontext.Context) (interface{}, error) { + ret, err2 := dc.con.ExecuteSQLWithIgnoreError(tctx, nil, "chaos-cases", ignoreExecSQLError, queries) + return ret, err2 + }) + return ret.(int), err +} diff --git a/engine/chaos/cases/main.go b/engine/chaos/cases/main.go index 80b0d65e75..025e738d59 100644 --- a/engine/chaos/cases/main.go +++ b/engine/chaos/cases/main.go @@ -66,7 +66,7 @@ func main() { rand.Seed(time.Now().UnixNano()) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), cfg.Duration) defer cancel() sc := make(chan os.Signal, 1) diff --git a/engine/chaos/manifests/Dockerfile b/engine/chaos/manifests/Dockerfile index 7bd4f31f3e..b8976de2e2 100644 --- a/engine/chaos/manifests/Dockerfile +++ b/engine/chaos/manifests/Dockerfile @@ -2,6 +2,7 @@ FROM alpine:3.14 ADD tiflow /tiflow ADD tiflow-chaos-case /tiflow-chaos-case +ADD engine-conf /engine-conf RUN chmod a+x /tiflow /tiflow-chaos-case diff --git a/engine/chaos/manifests/cases.yaml b/engine/chaos/manifests/cases.yaml index e5fed30804..4cfa08ccc5 100644 --- a/engine/chaos/manifests/cases.yaml +++ b/engine/chaos/manifests/cases.yaml @@ -11,6 +11,7 @@ spec: imagePullPolicy: IfNotPresent command: - "/tiflow-chaos-case" + - "--config-dir=/engine-conf" - "--duration=20m" restartPolicy: Never backoffLimit: 0 # fail immediately diff --git a/engine/chaos/manifests/network-emulation-dataflow.yaml b/engine/chaos/manifests/network-emulation-dataflow.yaml index cda46b7051..f69de5a1a9 100644 --- a/engine/chaos/manifests/network-emulation-dataflow.yaml +++ b/engine/chaos/manifests/network-emulation-dataflow.yaml @@ -1,17 +1,20 @@ ---- -# A Network Loss action causes network packets to drop randomly apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos +kind: Schedule metadata: name: network-loss-dataflow labels: app: network-loss-dataflow spec: - action: loss - mode: one - selector: - pods: - default: # default namespace + schedule: 2-59/6 * * * * + type: NetworkChaos + historyLimit: 5 + concurrencyPolicy: Forbid + networkChaos: + action: loss + mode: one + selector: + pods: + default: - chaos-server-master-0 - chaos-server-master-1 - chaos-server-master-2 @@ -19,95 +22,7 @@ spec: - chaos-executor-1 - chaos-executor-2 - chaos-executor-3 - loss: - loss: "25" - correlation: "25" - duration: "30s" - scheduler: - cron: "2-59/6 * * * *" # At every 6th minute from 2 through 59, (2, 8, 14, ...) - - -# A Network Delay action causes delays in message sending ---- -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-delay-dataflow - labels: - app: network-delay-dataflow -spec: - action: delay - mode: one - selector: - pods: - default: # default namespace - - chaos-server-master-0 - - chaos-server-master-1 - - chaos-server-master-2 - - chaos-executor-0 - - chaos-executor-1 - - chaos-executor-2 - - chaos-executor-3 - delay: - latency: "150ms" - correlation: "25" - jitter: "150ms" - duration: "30s" - scheduler: - cron: "3-59/6 * * * *" # At every 6th minute from 3 through 59, (3, 9, 15, ...) - ---- -# A Network Duplicate action causes packet duplication -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-duplicate-dataflow - labels: - app: network-duplicate-dataflow -spec: - action: duplicate - mode: one - selector: - pods: - default: # default namespace - - chaos-server-master-0 - - chaos-server-master-1 - - chaos-server-master-2 - - chaos-executor-0 - - chaos-executor-1 - - chaos-executor-2 - - chaos-executor-3 - duplicate: - duplicate: "40" - correlation: "25" - duration: "30s" - scheduler: - cron: "4-59/6 * * * *" # At every 6th minute from 4 through 59, (4, 10, 16, ...) - ---- -# A Network Corrupt action causes packet corruption -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-corrupt-dataflow - labels: - app: network-corrupt-dataflow -spec: - action: corrupt - mode: one - selector: - pods: - default: # default namespace - - chaos-server-master-0 - - chaos-server-master-1 - - chaos-server-master-2 - - chaos-executor-0 - - chaos-executor-1 - - chaos-executor-2 - - chaos-executor-3 - corrupt: - corrupt: "40" - correlation: "25" - duration: "30s" - scheduler: - cron: "5-59/6 * * * *" # At every 5th minute from 5 through 59, (5, 11, 17, ...) + loss: + loss: "25" + correlation: "25" + duration: 30s diff --git a/engine/chaos/manifests/network-partition-dataflow.yaml b/engine/chaos/manifests/network-partition-dataflow.yaml index 8bf3416563..786c3abb90 100644 --- a/engine/chaos/manifests/network-partition-dataflow.yaml +++ b/engine/chaos/manifests/network-partition-dataflow.yaml @@ -1,71 +1,24 @@ ---- -# network partition between server-master and executor apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos +kind: Schedule metadata: name: network-partition-dataflow-master-executor labels: app: network-partition-dataflow-master-executor spec: - action: partition - mode: one - selector: - labelSelectors: - "app": "chaos-server-master" - direction: both - target: - selector: - labelSelectors: - "app": "chaos-executor" - mode: one - duration: "20s" - scheduler: - cron: "2-59/4 * * * *" # At every 4th minute from 2 through 59, (2, 6, 10, ...) - ---- -# network partition between dataflow server-master members -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-partition-dataflow-master-master - labels: - app: network-partition-dataflow-master-master -spec: - action: partition - mode: one - selector: - labelSelectors: - "app": "chaos-server-master" - direction: both - target: - selector: - labelSelectors: - "app": "chaos-server-master" + schedule: 2-59/4 * * * * + type: NetworkChaos + historyLimit: 5 + concurrencyPolicy: Forbid + networkChaos: + action: partition mode: one - duration: "20s" - scheduler: - cron: "3-59/4 * * * *" # At every 4th minute from 3 through 59, (3, 7, 11, ...) - ---- -# network partition between dataflow executor members -apiVersion: chaos-mesh.org/v1alpha1 -kind: NetworkChaos -metadata: - name: network-partition-dataflow-executor-executor - labels: - app: network-partition-dataflow-executor-executor -spec: - action: partition - mode: one - selector: - labelSelectors: - "app": "chaos-executor" - direction: both - target: selector: labelSelectors: - "app": "chaos-executor" - mode: one - duration: "20s" - scheduler: - cron: "1-59/4 * * * *" # At every 4th minute from 1 through 59, (1, 5, 9, ...) + app: chaos-server-master + direction: both + target: + selector: + labelSelectors: + app: chaos-executor + mode: one + duration: 20s diff --git a/engine/chaos/manifests/pod-failure-dataflow.yaml b/engine/chaos/manifests/pod-failure-dataflow.yaml index 952b68b817..2a2934b30c 100644 --- a/engine/chaos/manifests/pod-failure-dataflow.yaml +++ b/engine/chaos/manifests/pod-failure-dataflow.yaml @@ -1,16 +1,21 @@ apiVersion: chaos-mesh.org/v1alpha1 -kind: PodChaos +kind: Schedule metadata: name: pod-failure-dataflow labels: app: pod-failure-dataflow spec: - action: pod-failure - mode: one - duration: "30s" - selector: - pods: - default: # default namespace + schedule: '@every 2m' + type: PodChaos + historyLimit: 5 + concurrencyPolicy: Forbid + podChaos: + action: pod-failure + mode: one + duration: 30s + selector: + pods: + default: - chaos-server-master-0 - chaos-server-master-1 - chaos-server-master-2 @@ -18,5 +23,3 @@ spec: - chaos-executor-1 - chaos-executor-2 - chaos-executor-3 - scheduler: - cron: "@every 2m" diff --git a/engine/chaos/manifests/pod-kill-dataflow.yaml b/engine/chaos/manifests/pod-kill-dataflow.yaml index d3d12259a1..924f149cf3 100644 --- a/engine/chaos/manifests/pod-kill-dataflow.yaml +++ b/engine/chaos/manifests/pod-kill-dataflow.yaml @@ -1,16 +1,21 @@ apiVersion: chaos-mesh.org/v1alpha1 -kind: PodChaos +kind: Schedule metadata: name: pod-kill-dataflow labels: app: pod-kill-dataflow spec: - action: pod-kill - mode: one - gracePeriod: 30 - selector: - pods: - default: # default namespace + schedule: '@every 1m' + type: PodChaos + historyLimit: 5 + concurrencyPolicy: Forbid + podChaos: + action: pod-kill + mode: one + gracePeriod: 30 + selector: + pods: + default: - chaos-server-master-0 - chaos-server-master-1 - chaos-server-master-2 @@ -18,5 +23,3 @@ spec: - chaos-executor-1 - chaos-executor-2 - chaos-executor-3 - scheduler: - cron: "@every 2m" diff --git a/engine/chaos/manifests/time-shift-dataflow.yaml b/engine/chaos/manifests/time-shift-dataflow.yaml index 239cd8622a..58ba336480 100644 --- a/engine/chaos/manifests/time-shift-dataflow.yaml +++ b/engine/chaos/manifests/time-shift-dataflow.yaml @@ -1,16 +1,21 @@ apiVersion: chaos-mesh.org/v1alpha1 -kind: TimeChaos +kind: Schedule metadata: name: time-shift-dataflow labels: app: time-shift-dataflow spec: - mode: "random-max-percent" - value: "60" - duration: "30s" - selector: - pods: - default: # default namespace + schedule: '@every 2m' + type: TimeChaos + historyLimit: 5 + concurrencyPolicy: Forbid + timeChaos: + mode: random-max-percent + value: "60" + duration: 30s + selector: + pods: + default: - chaos-server-master-0 - chaos-server-master-1 - chaos-server-master-2 @@ -18,8 +23,6 @@ spec: - chaos-executor-1 - chaos-executor-2 - chaos-executor-3 - timeOffset: '-10m' - clockIds: + timeOffset: -10m + clockIds: - CLOCK_REALTIME - scheduler: - cron: "@every 2m" diff --git a/engine/chaos/scripts/check-case.sh b/engine/chaos/scripts/check-case.sh index d7cf739541..7d82d6c1ab 100755 --- a/engine/chaos/scripts/check-case.sh +++ b/engine/chaos/scripts/check-case.sh @@ -1,7 +1,7 @@ #!/bin/bash completed=false -for i in {1..20}; do +for i in {1..22}; do kubectl wait --for=condition=complete job/chaos-test-case --timeout=1m if [ $? -eq 0 ]; then completed=true diff --git a/engine/jobmaster/dm/worker_manager.go b/engine/jobmaster/dm/worker_manager.go index c8b350b9c4..f98928c702 100644 --- a/engine/jobmaster/dm/worker_manager.go +++ b/engine/jobmaster/dm/worker_manager.go @@ -245,7 +245,7 @@ func (wm *WorkerManager) checkAndScheduleWorkers(ctx context.Context, job *metad } else if !runningWorker.RunAsExpected() { wm.logger.Info("unexpected worker status", zap.String("task_id", taskID), zap.Stringer("worker_stage", runningWorker.Stage), zap.Stringer("unit", runningWorker.Unit), zap.Stringer("next_unit", nextUnit)) } else { - wm.logger.Info("switch to next unit", zap.String("task_id", taskID), zap.Stringer("next_unit", runningWorker.Unit)) + wm.logger.Info("switch to next unit", zap.String("task_id", taskID), zap.Stringer("next_unit", nextUnit)) } var resources []resModel.ResourceID