diff --git a/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator.go b/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator.go index d574e70f..93fec62b 100644 --- a/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator.go +++ b/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator.go @@ -76,6 +76,7 @@ type dump struct { filtered []byte cleanupPart []byte indicesAndConstraints []byte + views []byte sequences []string roles map[string]role eventTriggers []byte @@ -240,9 +241,15 @@ func (s *SnapshotGenerator) CreateSnapshot(ctx context.Context, ss *snapshot.Sna s.logger.Info("restoring schema indices and constraints", loglib.Fields{"schemaTables": ss.SchemaTables}) if s.snapshotTracker != nil { - return s.restoreIndicesWithTracking(ctx, dump.indicesAndConstraints) + if err := s.restoreIndicesWithTracking(ctx, dump.indicesAndConstraints); err != nil { + return err + } + } else if err := s.restoreDump(ctx, dump.indicesAndConstraints); err != nil { + return err } - return s.restoreDump(ctx, dump.indicesAndConstraints) + + s.logger.Info("restoring views") + return s.restoreDump(ctx, dump.views) } func (s *SnapshotGenerator) Close() error { @@ -290,6 +297,7 @@ func (s *SnapshotGenerator) dumpSchema(ctx context.Context, schemaTables map[str s.dumpToFile(s.getDumpFileName("-filtered"), pgdumpOpts, parsedDump.filtered) s.dumpToFile(s.getDumpFileName("-indices-constraints"), pgdumpOpts, parsedDump.indicesAndConstraints) + s.dumpToFile(s.getDumpFileName("-views"), pgdumpOpts, parsedDump.views) // only if clean is enabled, produce the clean up part of the dump if s.optionGenerator.cleanTargetDB { @@ -406,10 +414,12 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump { indicesAndConstraints := strings.Builder{} filteredDump := strings.Builder{} eventTriggersDump := strings.Builder{} + viewsDump := strings.Builder{} sequenceNames := []string{} dumpRoles := make(map[string]role) alterTable := "" createEventTrigger := "" + createView := "" for scanner.Scan() { line := scanner.Text() switch { @@ -446,11 +456,27 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump { eventTriggersDump.WriteString(line) eventTriggersDump.WriteString("\n") + case strings.HasPrefix(line, "CREATE VIEW"), + strings.HasPrefix(line, "CREATE MATERIALIZED VIEW"): + createView = line + fallthrough + case createView != "": + if strings.HasSuffix(line, ";") { + viewsDump.WriteString(line) + viewsDump.WriteString("\n\n") + createView = "" + continue + } + viewsDump.WriteString(line) + viewsDump.WriteString("\n") + case strings.Contains(line, `\connect`): indicesAndConstraints.WriteString(line) indicesAndConstraints.WriteString("\n\n") filteredDump.WriteString(line) filteredDump.WriteString("\n") + viewsDump.WriteString(line) + viewsDump.WriteString("\n\n") case strings.HasPrefix(line, "CREATE INDEX"), strings.HasPrefix(line, "CREATE UNIQUE INDEX"), strings.HasPrefix(line, "CREATE CONSTRAINT"), @@ -515,6 +541,7 @@ func (s *SnapshotGenerator) parseDump(d []byte) *dump { full: d, filtered: []byte(filteredDump.String()), indicesAndConstraints: []byte(indicesAndConstraints.String()), + views: []byte(viewsDump.String()), sequences: sequenceNames, roles: dumpRoles, eventTriggers: []byte(eventTriggersDump.String()), diff --git a/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator_test.go b/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator_test.go index eea95c04..66bc68c6 100644 --- a/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator_test.go +++ b/pkg/snapshot/generator/postgres/schema/pgdumprestore/snapshot_pg_dump_restore_generator_test.go @@ -23,11 +23,13 @@ func TestSnapshotGenerator_CreateSnapshot(t *testing.T) { t.Parallel() schemaDump := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE INDEX a;\n") + schemaDumpWithViews := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE VIEW public.test_view AS\n SELECT 1;\nCREATE INDEX a;\n") schemaDumpNoSequences := []byte("schema dump\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\nCREATE INDEX a;\n") filteredDumpNoSequences := []byte("schema dump\nGRANT ALL ON SCHEMA \"public\" TO \"test_role\";\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\n") filteredDump := []byte("schema dump\nCREATE SEQUENCE test.test_sequence\nGRANT ALL ON SCHEMA \"public\" TO \"test_role\";\nALTER TABLE public.test_table OWNER TO test_role;\nGRANT ALL ON TABLE public.test_table TO test_role2;\n") sequenceDump := []byte("sequence dump\n") indexDump := []byte("CREATE INDEX a;\n\n") + testViewsDump := []byte("CREATE VIEW public.test_view AS\n SELECT 1;\n\n") rolesDumpOriginal := []byte("roles dump\nCREATE ROLE postgres\nCREATE ROLE test_role\nCREATE ROLE test_role2\nALTER ROLE test_role3 INHERIT FROM test_role;\n") rolesDumpFiltered := []byte("roles dump\nCREATE ROLE test_role\nCREATE ROLE test_role2\nGRANT \"test_role\" TO CURRENT_USER;\n") cleanupDump := []byte("cleanup dump\n") @@ -281,6 +283,54 @@ func TestSnapshotGenerator_CreateSnapshot(t *testing.T) { wantErr: nil, }, + { + name: "ok - with views", + snapshot: &snapshot.Snapshot{ + SchemaTables: map[string][]string{ + testSchema: {testTable}, + }, + }, + conn: validQuerier(), + pgdumpFn: newMockPgdump(func(_ context.Context, i uint, po pglib.PGDumpOptions) ([]byte, error) { + switch i { + case 1: + return schemaDumpWithViews, nil + case 2: + return sequenceDump, nil + default: + return nil, fmt.Errorf("unexpected call to pgdumpFn: %d", i) + } + }), + pgdumpallFn: newMockPgdumpall(func(_ context.Context, i uint, po pglib.PGDumpAllOptions) ([]byte, error) { + switch i { + case 1: + return rolesDumpOriginal, nil + default: + return nil, fmt.Errorf("unexpected call to pgdumpallFn: %d", i) + } + }), + pgrestoreFn: newMockPgrestore(func(_ context.Context, i uint, po pglib.PGRestoreOptions, dump []byte) (string, error) { + switch i { + case 1: + require.Equal(t, string(schemaCreateDump), string(dump)) + case 2: + require.Equal(t, string(rolesDumpFiltered), string(dump)) + case 3: + require.Equal(t, string(filteredDump), string(dump)) + case 4: + require.Equal(t, string(sequenceDump), string(dump)) + case 5: + require.Equal(t, string(indexDump), string(dump)) + case 6: + require.Equal(t, string(testViewsDump), string(dump)) + default: + return "", fmt.Errorf("unexpected call to pgrestoreFn: %d", i) + } + return "", nil + }), + + wantErr: nil, + }, { name: "ok - no sequence dump", snapshot: &snapshot.Snapshot{ @@ -1237,6 +1287,9 @@ func TestSnapshotGenerator_parseDump(t *testing.T) { wantEventTriggersBytes, err := os.ReadFile("test/test_dump_event_triggers.sql") require.NoError(t, err) + wantViewsBytes, err := os.ReadFile("test/test_dump_views.sql") + require.NoError(t, err) + sg := &SnapshotGenerator{ excludedSecurityLabels: []string{"anon"}, } @@ -1248,12 +1301,15 @@ func TestSnapshotGenerator_parseDump(t *testing.T) { wantConstraintsStr := strings.Trim(string(wantConstraintsBytes), "\n") eventTriggersStr := strings.Trim(string(dump.eventTriggers), "\n") wantEventTriggersStr := strings.Trim(string(wantEventTriggersBytes), "\n") + viewsStr := strings.Trim(string(dump.views), "\n") + wantViewsStr := strings.Trim(string(wantViewsBytes), "\n") wantSequences := []string{`"musicbrainz"."alternative_medium_id_seq"`, `"musicbrainz"."Alternative_medium_id_seq"`} require.Equal(t, wantFilteredStr, filteredStr) require.Equal(t, wantConstraintsStr, constraintsStr) require.Equal(t, wantSequences, dump.sequences) require.Equal(t, wantEventTriggersStr, eventTriggersStr) + require.Equal(t, wantViewsStr, viewsStr) } func TestGetDumpsDiff(t *testing.T) { diff --git a/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump.sql b/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump.sql index b362a014..e9daefe8 100644 --- a/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump.sql +++ b/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump.sql @@ -251,6 +251,37 @@ ALTER TABLE ONLY musicbrainz.alternative_medium ALTER COLUMN id SET DEFAULT next ALTER TABLE ONLY musicbrainz.alternative_release ALTER COLUMN id SET DEFAULT nextval('musicbrainz.alternative_release_id_seq'::regclass); +-- +-- Name: edit_summary; Type: VIEW; Schema: musicbrainz; Owner: postgres +-- + +CREATE VIEW musicbrainz.edit_summary AS + SELECT e.id, + e.editor, + e.type, + e.status + FROM musicbrainz.edit e + GROUP BY e.id; + + +ALTER VIEW musicbrainz.edit_summary OWNER TO postgres; + +-- +-- Name: medium_summary; Type: MATERIALIZED VIEW; Schema: musicbrainz; Owner: postgres +-- + +CREATE MATERIALIZED VIEW musicbrainz.medium_summary AS + SELECT am.id, + am.medium, + am.alternative_release, + am.name + FROM musicbrainz.alternative_medium am + GROUP BY am.id + WITH NO DATA; + + +ALTER MATERIALIZED VIEW musicbrainz.medium_summary OWNER TO postgres; + -- -- Name: alternative_medium alternative_medium_pkey; Type: CONSTRAINT; Schema: musicbrainz; Owner: postgres -- diff --git a/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_filtered.sql b/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_filtered.sql index af3387f2..90226f2b 100644 --- a/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_filtered.sql +++ b/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_filtered.sql @@ -240,6 +240,20 @@ ALTER TABLE ONLY musicbrainz.alternative_medium ALTER COLUMN id SET DEFAULT next ALTER TABLE ONLY musicbrainz.alternative_release ALTER COLUMN id SET DEFAULT nextval('musicbrainz.alternative_release_id_seq'::regclass); +-- +-- Name: edit_summary; Type: VIEW; Schema: musicbrainz; Owner: postgres +-- + + + + +-- +-- Name: medium_summary; Type: MATERIALIZED VIEW; Schema: musicbrainz; Owner: postgres +-- + + + + -- -- Name: alternative_medium alternative_medium_pkey; Type: CONSTRAINT; Schema: musicbrainz; Owner: postgres -- diff --git a/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_views.sql b/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_views.sql new file mode 100644 index 00000000..10e309c4 --- /dev/null +++ b/pkg/snapshot/generator/postgres/schema/pgdumprestore/test/test_dump_views.sql @@ -0,0 +1,18 @@ +\connect test + +CREATE VIEW musicbrainz.edit_summary AS + SELECT e.id, + e.editor, + e.type, + e.status + FROM musicbrainz.edit e + GROUP BY e.id; + +CREATE MATERIALIZED VIEW musicbrainz.medium_summary AS + SELECT am.id, + am.medium, + am.alternative_release, + am.name + FROM musicbrainz.alternative_medium am + GROUP BY am.id + WITH NO DATA; diff --git a/pkg/stream/integration/pg_pg_integration_test.go b/pkg/stream/integration/pg_pg_integration_test.go index a4f00340..511dfa17 100644 --- a/pkg/stream/integration/pg_pg_integration_test.go +++ b/pkg/stream/integration/pg_pg_integration_test.go @@ -27,6 +27,11 @@ type testTableColumn struct { username string } +type idNameRow struct { + id int + name string +} + func Test_PostgresToPostgres(t *testing.T) { if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" { t.Skip("skipping integration test...") @@ -807,6 +812,38 @@ func getTestTableColumns(t *testing.T, ctx context.Context, conn pglib.Querier, return columns } +func getIDRows(t *testing.T, ctx context.Context, conn pglib.Querier, query string) []int { + rows, err := conn.Query(ctx, query) + require.NoError(t, err) + defer rows.Close() + + var ids []int + for rows.Next() { + var id int + err := rows.Scan(&id) + require.NoError(t, err) + ids = append(ids, id) + } + require.NoError(t, rows.Err()) + return ids +} + +func getIDNameRows(t *testing.T, ctx context.Context, conn pglib.Querier, query string) []idNameRow { + rows, err := conn.Query(ctx, query) + require.NoError(t, err) + defer rows.Close() + + var result []idNameRow + for rows.Next() { + var r idNameRow + err := rows.Scan(&r.id, &r.name) + require.NoError(t, err) + result = append(result, r) + } + require.NoError(t, rows.Err()) + return result +} + func getRoles(t *testing.T, ctx context.Context, conn pglib.Querier) []string { rows, err := conn.Query(ctx, "select rolname from pg_roles where rolname not like 'pg_%' and rolname <> 'postgres'") require.NoError(t, err) diff --git a/pkg/stream/integration/snapshot_pg_integration_test.go b/pkg/stream/integration/snapshot_pg_integration_test.go index ba3bf4ed..61af3d52 100644 --- a/pkg/stream/integration/snapshot_pg_integration_test.go +++ b/pkg/stream/integration/snapshot_pg_integration_test.go @@ -105,3 +105,186 @@ func Test_SnapshotToPostgres(t *testing.T) { run("snapshot2pg_batch_integration_test") }) } + +// Test_SnapshotToPostgres_IdentityOnlyTable verifies that tables where the only +// column is an identity column (e.g. id-only lookup tables) are correctly +// snapshotted, and that their values are preserved so that foreign key +// references from child tables remain valid. +func Test_SnapshotToPostgres_IdentityOnlyTable(t *testing.T) { + if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" { + t.Skip("skipping integration test...") + } + + var snapshotPGURL string + pgcleanup, err := testcontainers.SetupPostgresContainer(context.Background(), &snapshotPGURL, testcontainers.Postgres14, "config/postgresql.conf") + require.NoError(t, err) + defer pgcleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + run := func(suffix string, opts ...option) { + parentTable := fmt.Sprintf("parent_identity_%s", suffix) + childTable := fmt.Sprintf("child_identity_%s", suffix) + + // Create a parent table where id is the only column (identity). + // This is the minimal repro: filterRowColumns used to filter out + // all columns, producing zero insert queries. + execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf( + `CREATE TABLE %s(id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY)`, parentTable)) + execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf( + `INSERT INTO %s(id) VALUES (100),(200),(300)`, parentTable)) + + // Create a child table that references the parent via FK. + execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf( + `CREATE TABLE %s( + id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + parent_id INTEGER NOT NULL REFERENCES %s(id), + name TEXT + )`, childTable, parentTable)) + execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf( + `INSERT INTO %s(id, parent_id, name) VALUES (1, 100, 'a'),(2, 200, 'b'),(3, 300, 'c')`, childTable)) + + cfg := &stream.Config{ + Listener: testPostgresListenerCfgWithSnapshot(snapshotPGURL, targetPGURL, []string{"*.*"}), + Processor: testPostgresProcessorCfg(opts...), + } + initStream(t, ctx, snapshotPGURL) + runSnapshot(t, ctx, cfg) + + targetConn, err := pglib.NewConn(ctx, targetPGURL) + require.NoError(t, err) + + timer := time.NewTimer(20 * time.Second) + defer timer.Stop() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + validation := func() bool { + // Verify parent table: all 3 rows with exact id values preserved + parentRows := getIDRows(t, ctx, targetConn, fmt.Sprintf("SELECT id FROM %s ORDER BY id", parentTable)) + if len(parentRows) != 3 { + return false + } + require.Equal(t, []int{100, 200, 300}, parentRows) + + // Verify child table: all 3 rows with correct FK references + childRows := getIDNameRows(t, ctx, targetConn, fmt.Sprintf("SELECT parent_id, name FROM %s ORDER BY id", childTable)) + if len(childRows) != 3 { + return false + } + wantChildren := []idNameRow{ + {id: 100, name: "a"}, + {id: 200, name: "b"}, + {id: 300, name: "c"}, + } + require.Equal(t, wantChildren, childRows) + + return true + } + + for { + select { + case <-timer.C: + cancel() + t.Error("timeout waiting for postgres snapshot sync of identity-only tables") + return + case <-ticker.C: + if validation() { + return + } + } + } + } + + t.Run("bulk ingest", func(t *testing.T) { + run("bulk", withBulkIngestionEnabled()) + }) + t.Run("batch writer", func(t *testing.T) { + run("batch") + }) +} + +// Test_SnapshotToPostgres_IdentityAndGeneratedColumns verifies that identity +// columns have their values preserved while generated (stored) columns are +// correctly excluded from inserts and recomputed by the target database. +func Test_SnapshotToPostgres_IdentityAndGeneratedColumns(t *testing.T) { + if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" { + t.Skip("skipping integration test...") + } + + var snapshotPGURL string + pgcleanup, err := testcontainers.SetupPostgresContainer(context.Background(), &snapshotPGURL, testcontainers.Postgres14, "config/postgresql.conf") + require.NoError(t, err) + defer pgcleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + run := func(suffix string, opts ...option) { + testTable := fmt.Sprintf("identity_generated_%s", suffix) + + // Table with both an identity column (id) and a generated stored column (display_name). + // The identity value must be preserved; the generated column must be excluded + // from inserts and recomputed on the target. + execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf( + `CREATE TABLE %s( + id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + name TEXT NOT NULL, + display_name TEXT GENERATED ALWAYS AS ('item_' || name) STORED + )`, testTable)) + execQueryWithURL(t, ctx, snapshotPGURL, fmt.Sprintf( + `INSERT INTO %s(id, name) VALUES (10, 'alpha'),(20, 'beta'),(30, 'gamma')`, testTable)) + + cfg := &stream.Config{ + Listener: testPostgresListenerCfgWithSnapshot(snapshotPGURL, targetPGURL, []string{"*.*"}), + Processor: testPostgresProcessorCfg(opts...), + } + initStream(t, ctx, snapshotPGURL) + runSnapshot(t, ctx, cfg) + + targetConn, err := pglib.NewConn(ctx, targetPGURL) + require.NoError(t, err) + + timer := time.NewTimer(20 * time.Second) + defer timer.Stop() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + validation := func() bool { + rows := getTestTableColumns(t, ctx, targetConn, + fmt.Sprintf("select id, name, display_name from %s order by id", testTable), withGeneratedColumn) + if len(rows) != 3 { + return false + } + + want := []*testTableColumn{ + {id: 10, name: "alpha", username: "item_alpha"}, + {id: 20, name: "beta", username: "item_beta"}, + {id: 30, name: "gamma", username: "item_gamma"}, + } + require.Equal(t, want, rows) + return true + } + + for { + select { + case <-timer.C: + cancel() + t.Error("timeout waiting for postgres snapshot sync of identity+generated table") + return + case <-ticker.C: + if validation() { + return + } + } + } + } + + t.Run("bulk ingest", func(t *testing.T) { + run("bulk", withBulkIngestionEnabled()) + }) + t.Run("batch writer", func(t *testing.T) { + run("batch") + }) +} diff --git a/pkg/wal/processor/injector/wal_injector.go b/pkg/wal/processor/injector/wal_injector.go index 1525051e..66af4123 100644 --- a/pkg/wal/processor/injector/wal_injector.go +++ b/pkg/wal/processor/injector/wal_injector.go @@ -139,11 +139,11 @@ func (in *Injector) Name() string { } func (in *Injector) Close() error { + var querierErr error if in.querier != nil { - in.querier.Close(context.Background()) + querierErr = in.querier.Close(context.Background()) } - - return nil + return errors.Join(in.processor.Close(), querierErr) } func (in *Injector) updateTableCache(ddlEvent *wal.DDLEvent) { diff --git a/pkg/wal/processor/injector/wal_injector_test.go b/pkg/wal/processor/injector/wal_injector_test.go index e5332f36..1f4e8577 100644 --- a/pkg/wal/processor/injector/wal_injector_test.go +++ b/pkg/wal/processor/injector/wal_injector_test.go @@ -483,6 +483,57 @@ func TestInjector_inject(t *testing.T) { } } +func TestInjector_Close(t *testing.T) { + t.Parallel() + + t.Run("propagates close to processor and querier", func(t *testing.T) { + t.Parallel() + + processorClosed := false + querierClosed := false + + i := &Injector{ + logger: loglib.NewNoopLogger(), + processor: &mocks.Processor{ + ProcessWALEventFn: func(_ context.Context, _ *wal.Event) error { return nil }, + CloseFn: func() error { + processorClosed = true + return nil + }, + }, + querier: &pgmocks.Querier{ + CloseFn: func(_ context.Context) error { + querierClosed = true + return nil + }, + }, + } + + err := i.Close() + require.NoError(t, err) + require.True(t, processorClosed, "Close must propagate to the wrapped processor") + require.True(t, querierClosed, "Close must propagate to the querier") + }) + + t.Run("returns joined errors", func(t *testing.T) { + t.Parallel() + + i := &Injector{ + logger: loglib.NewNoopLogger(), + processor: &mocks.Processor{ + ProcessWALEventFn: func(_ context.Context, _ *wal.Event) error { return nil }, + CloseFn: func() error { return errTest }, + }, + querier: &pgmocks.Querier{ + CloseFn: func(_ context.Context) error { return errTest }, + }, + } + + err := i.Close() + require.Error(t, err) + }) +} + func TestInjector_injectColumnIDs(t *testing.T) { t.Parallel() diff --git a/pkg/wal/processor/postgres/postgres_schema_observer.go b/pkg/wal/processor/postgres/postgres_schema_observer.go index ffad7e8c..37b592bb 100644 --- a/pkg/wal/processor/postgres/postgres_schema_observer.go +++ b/pkg/wal/processor/postgres/postgres_schema_observer.go @@ -183,7 +183,7 @@ func (o *pgSchemaObserver) updateColumnSequences(tables []wal.DDLObject) { const generatedTableColumnsQuery = `SELECT attname FROM pg_attribute WHERE attnum > 0 AND attrelid = (SELECT c.oid FROM pg_class c JOIN pg_namespace n ON c.relnamespace=n.oid WHERE c.relname=$1 and n.nspname=$2) - AND (attgenerated != '' OR attidentity != '')` + AND attgenerated != ''` func (o *pgSchemaObserver) queryGeneratedColumnNames(ctx context.Context, schemaName, tableName string) (map[string]struct{}, error) { columnNames := map[string]struct{}{}